"""
优化版文件夹文档生成MCP服务器主文件
这是MCP服务器的核心实现,集成了所有中间件、服务和工具。
"""
import asyncio
import logging
logger = logging.getLogger(__name__)
import sys
from pathlib import Path
from typing import Optional, Dict, Any
from mcp.server.fastmcp import FastMCP
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from data_access import (
FileSystemInterface,
CacheInterface,
ConfigInterface,
SecurityValidator
)
from middleware import (
auth_middleware,
logging_middleware,
error_handler_middleware
)
from services import (
DocumentService,
AnalysisService,
TemplateService,
CacheService
)
from tools import register_all_tools
from utils import (
ConfigManager,
create_default_config_manager,
InteractionManager,
set_interaction_level,
InteractionLevel,
create_task,
start_task,
complete_task,
fail_task,
get_progress_summary,
default_logger,
LogLevel,
LoggerCategory,
performance_monitor,
memory_usage_monitor,
save_performance_report,
get_current_performance_status
)
class OptimizedFolderDocMCPServer:
"""优化版文件夹文档生成MCP服务器"""
def __init__(self, config_path: Optional[str] = None, interaction_level: InteractionLevel = InteractionLevel.NORMAL):
"""
初始化MCP服务器
Args:
config_path: 配置文件路径
interaction_level: 交互级别
"""
# 设置交互级别
self.interaction = InteractionManager()
set_interaction_level(interaction_level)
# 初始化配置管理器
self.config_manager = create_default_config_manager(config_path or "config/user_config.yaml")
# 加载配置
self.config_interface = ConfigInterface()
self.config = self.config_interface.load_config(config_path)
# 初始化数据访问层
self.security_validator = SecurityValidator()
self.file_system = FileSystemInterface(self.security_validator)
self.cache = CacheInterface(
memory_size=self.config.performance.cache_memory_size,
disk_cache_dir=self.config.performance.cache_disk_dir,
disk_max_size=self.config.performance.cache_disk_max_size,
enable_disk_cache=self.config.performance.cache_disk_enabled
)
# 初始化服务层
self.cache_service = CacheService(self.cache)
self.template_service = TemplateService(
template_dir=self.config_dir / "config/templates"
)
self.document_service = DocumentService(
self.file_system,
self.template_service,
self.cache_service
)
self.analysis_service = AnalysisService(
self.file_system,
self.cache_service
)
# 初始化MCP服务器
self.mcp = FastMCP("optimized_folder_docs_mcp")
# 设置中间件
self._setup_middleware()
# 注册工具
self._register_tools()
# 配置认证中间件的允许路径
if self.config.security.allowed_paths:
auth_middleware.allowed_paths = set(self.config.security.allowed_paths)
@property
def config_dir(self) -> Path:
"""获取配置目录路径"""
return Path(__file__).parent.parent
def _setup_middleware(self):
"""设置中间件"""
# 中间件逻辑已集成到各服务中
pass
def _register_tools(self):
"""注册所有MCP工具"""
register_all_tools(
self.mcp,
self.document_service,
self.analysis_service,
self.template_service,
self.file_system,
self.cache_service
)
async def start(self):
"""启动MCP服务器"""
task_id = "server_startup"
try:
# 创建启动进度跟踪
create_task(task_id, "MCP服务器启动")
start_task(task_id)
logging_middleware.logger.info(
"MCP服务器启动",
version="2.0.0",
config=str(self.config_dir / "config" / "default.yaml")
)
complete_task(task_id, "服务器初始化完成")
# 启动FastMCP服务器
await self.mcp.run()
except KeyboardInterrupt:
fail_task(task_id, "收到停止信号")
logging_middleware.logger.info("MCP服务器收到停止信号")
except (RuntimeError, ValueError) as e:
fail_task(task_id, f"启动失败: {str(e)}")
logging_middleware.logger.error(
"MCP服务器启动失败",
error=str(e),
error_type=type(e).__name__
)
raise
async def stop(self):
"""停止MCP服务器"""
try:
# 清理缓存
self.cache.clear()
# 清理会话
auth_middleware.cleanup_expired_sessions()
logging_middleware.logger.info("MCP服务器已停止")
except (RuntimeError, ValueError) as e:
logging_middleware.logger.error(
"MCP服务器停止时出错",
error=str(e)
)
async def health_check(self) -> Dict[str, Any]:
"""健康检查"""
try:
# 检查各组件状态
cache_stats = self.cache.get_stats()
health = {
"status": "healthy",
"version": "2.0.0",
"components": {
"cache": {
"status": "healthy",
"stats": cache_stats
},
"config": {
"status": "healthy",
"config_loaded": self.config is not None
},
"file_system": {
"status": "healthy"
}
},
"timestamp": logging_middleware.logger.bind().get("timestamp", "unknown")
}
return health
except (RuntimeError, ValueError) as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": "unknown"
}
async def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="优化版文件夹文档生成MCP服务器")
parser.add_argument(
"--config",
type=str,
help="配置文件路径",
default=None
)
parser.add_argument(
"--health-check",
action="store_true",
help="执行健康检查并退出"
)
parser.add_argument(
"--interactive-config",
action="store_true",
help="运行交互式配置"
)
parser.add_argument(
"--interaction-level",
type=str,
choices=["silent", "basic", "normal", "verbose", "debug"],
default="normal",
help="交互级别"
)
args = parser.parse_args()
# 设置交互级别
interaction_level = InteractionLevel(args.interaction_level)
# 创建服务器实例
server = OptimizedFolderDocMCPServer(args.config, interaction_level)
# 如果需要交互式配置
if args.interactive_config:
server.config_manager.interactive_config()
return
try:
if args.health_check:
# 执行健康检查
health = await server.health_check()
print("健康检查结果:")
print(f"状态: {health['status']}")
if health['status'] == 'unhealthy':
print(f"错误: {health['error']}")
sys.exit(1)
else:
print("所有组件运行正常")
sys.exit(0)
else:
# 启动服务器
await server.start()
except KeyboardInterrupt:
print("\n收到停止信号,正在关闭服务器...")
except (RuntimeError, ValueError) as e:
print(f"服务器运行出错: {e}")
sys.exit(1)
finally:
await server.stop()
if __name__ == "__main__":
asyncio.run(main())