Skip to main content
Glama

Mnemosyne MCP

by MumuTW
mcp_service.py12 kB
""" MCP 主服務 gRPC 實作 實作 MnemosyneMCP 服務的核心業務邏輯。 """ import uuid from typing import Any, Dict, Optional import grpc import structlog from google.protobuf import timestamp_pb2 from ..core.config import Settings from ..drivers.falkordb_driver import FalkorDBDriver from ..interfaces.graph_store import ConnectionConfig from ..llm.providers.openai_provider import OpenAIProvider from .generated import mcp_pb2, mcp_pb2_grpc logger = structlog.get_logger(__name__) class MnemosyneMCPService(mcp_pb2_grpc.MnemosyneMCPServicer): """ MnemosyneMCP 主服務實作 提供核心的搜索、分析和管理功能。 """ def __init__(self, settings: Settings): """ 初始化服務 Args: settings: 應用配置 """ self.settings = settings self.logger = structlog.get_logger(f"{__name__}.{self.__class__.__name__}") # 初始化 FalkorDB 驅動器 self.driver = FalkorDBDriver( ConnectionConfig( host=settings.database.host, port=settings.database.port, database=settings.database.database, username=settings.database.username, password=settings.database.password, ) ) # 初始化 LLM Provider self.llm_provider: Optional[OpenAIProvider] = None # 服務統計 self.stats = { "total_requests": 0, "successful_requests": 0, "failed_requests": 0, "search_requests": 0, "impact_analysis_requests": 0, } async def initialize(self) -> None: """初始化服務組件""" try: # 連接資料庫 if not self.driver.is_connected: await self.driver.connect() self.logger.info("Database connected successfully") # 初始化 LLM Provider if not self.llm_provider: # TODO: 從 Settings 中獲取 LLM 配置 llm_config = { "model": "gpt-3.5-turbo", "temperature": 0.7, "max_tokens": 1000, # 實際部署時需要設定 API_KEY "api_key": ( self.settings.security.api_key if hasattr(self.settings.security, "api_key") else None ), } self.llm_provider = OpenAIProvider(llm_config) await self.llm_provider.initialize() self.logger.info("LLM Provider initialized successfully") except Exception as e: self.logger.error("Failed to initialize service", error=str(e)) raise async def cleanup(self) -> None: """清理服務資源""" try: if self.driver and self.driver.is_connected: await self.driver.disconnect() self.logger.info("Database disconnected") except Exception as e: self.logger.error("Error during cleanup", error=str(e)) def HealthCheck(self, request, context): """系統健康檢查 - 純連線測試""" self.stats["total_requests"] += 1 try: # 純粹的健康檢查,不涉及業務邏輯 self.logger.debug("HealthCheck called") # 創建回應時間戳 timestamp = timestamp_pb2.Timestamp() timestamp.GetCurrentTime() response = mcp_pb2.HealthCheckResponse( status=mcp_pb2.HealthCheckResponse.Status.SERVING, message="gRPC service is healthy", timestamp=timestamp, ) self.stats["successful_requests"] += 1 return response except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("HealthCheck failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Health check failed: {str(e)}") # 返回不健康狀態 timestamp = timestamp_pb2.Timestamp() timestamp.GetCurrentTime() return mcp_pb2.HealthCheckResponse( status=mcp_pb2.HealthCheckResponse.Status.NOT_SERVING, message=f"Service unhealthy: {str(e)}", timestamp=timestamp, ) def IngestProject(self, request, context): """數據導入與管理 - 導入專案""" self.stats["total_requests"] += 1 try: self.logger.info("IngestProject called", project_id=request.project_id) # TODO: 實作專案導入邏輯 # 這將在後續的 ECL 整合中實作 task_id = f"task_{uuid.uuid4()}" self.stats["successful_requests"] += 1 return mcp_pb2.IngestProjectResponse(task_id=task_id) except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("IngestProject failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Internal error: {str(e)}") return mcp_pb2.IngestProjectResponse() def GetIngestStatus(self, request, context): """獲取導入狀態""" self.stats["total_requests"] += 1 try: self.logger.info("GetIngestStatus called", task_id=request.task_id) # TODO: 實作狀態查詢邏輯 self.stats["successful_requests"] += 1 return mcp_pb2.IngestStatus( task_id=request.task_id, status=mcp_pb2.IngestStatus.Status.COMPLETED, message="Task completed successfully", ) except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("GetIngestStatus failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Internal error: {str(e)}") return mcp_pb2.IngestStatus() def Search(self, request, context): """查詢與分析 - 搜索功能""" self.stats["total_requests"] += 1 self.stats["search_requests"] += 1 try: self.logger.info( "Search called", query=request.query_text, top_k=request.top_k ) # TODO: 實作搜索邏輯 # 這將在主線3中實作混合檢索功能 # 暫時返回空結果 response = mcp_pb2.SearchResponse( summary="Search functionality is under development", relevant_nodes=[], subgraph=mcp_pb2.Graph(nodes=[], edges=[], metadata={}), suggested_next_step="Please wait for implementation completion", ) self.stats["successful_requests"] += 1 return response except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("Search failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Internal error: {str(e)}") return mcp_pb2.SearchResponse() def RunImpactAnalysis(self, request, context): """運行影響力分析""" self.stats["total_requests"] += 1 self.stats["impact_analysis_requests"] += 1 try: self.logger.info( "RunImpactAnalysis called", project_id=request.project_id, pr_number=request.pr_number, ) # TODO: 實作影響力分析邏輯 # 這將在主線4中實作 # 暫時返回空結果 response = mcp_pb2.ImpactAnalysisResponse( summary="Impact analysis functionality is under development", risk_level=mcp_pb2.ImpactAnalysisResponse.RiskLevel.LOW, impact_subgraph=mcp_pb2.Graph(nodes=[], edges=[], metadata={}), ) self.stats["successful_requests"] += 1 return response except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("RunImpactAnalysis failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Internal error: {str(e)}") return mcp_pb2.ImpactAnalysisResponse() def ApplyConstraint(self, request, context): """應用約束(Sprint 4 功能)""" self.stats["total_requests"] += 1 try: self.logger.info("ApplyConstraint called") # Sprint 4 功能,暫時返回未實作 context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("ApplyConstraint will be implemented in Sprint 4") return mcp_pb2.ApplyConstraintResponse() except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("ApplyConstraint failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Internal error: {str(e)}") return mcp_pb2.ApplyConstraintResponse() def AcquireLock(self, request, context): """獲取鎖定(Sprint 4 功能)""" self.stats["total_requests"] += 1 try: self.logger.info("AcquireLock called") # Sprint 4 功能,暫時返回未實作 context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("AcquireLock will be implemented in Sprint 4") return mcp_pb2.AcquireLockResponse() except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("AcquireLock failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Internal error: {str(e)}") return mcp_pb2.AcquireLockResponse() def ReleaseLock(self, request, context): """釋放鎖定(Sprint 4 功能)""" self.stats["total_requests"] += 1 try: self.logger.info("ReleaseLock called") # Sprint 4 功能,暫時返回未實作 context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("ReleaseLock will be implemented in Sprint 4") return mcp_pb2.ReleaseLockResponse() except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("ReleaseLock failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Internal error: {str(e)}") return mcp_pb2.ReleaseLockResponse() def GetGraphVisualization(self, request, context): """獲取圖形可視化""" self.stats["total_requests"] += 1 try: self.logger.info( "GetGraphVisualization called", project_id=request.project_id ) # TODO: 實作圖形可視化邏輯 response = mcp_pb2.GraphVisualization( graph=mcp_pb2.Graph(nodes=[], edges=[], metadata={}), layout_data="{}", ) self.stats["successful_requests"] += 1 return response except Exception as e: self.stats["failed_requests"] += 1 self.logger.error("GetGraphVisualization failed", error=str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"Internal error: {str(e)}") return mcp_pb2.GraphVisualization() def get_service_stats(self) -> Dict[str, Any]: """獲取服務統計信息""" return { **self.stats, "success_rate": ( self.stats["successful_requests"] / max(self.stats["total_requests"], 1) * 100 ), }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MumuTW/Mnemosyne-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server