mcp_service.py•12 kB
"""
MCP 主服務 gRPC 實作
實作 MnemosyneMCP 服務的核心業務邏輯。
"""
import uuid
from typing import Any, Dict, Optional
import grpc
import structlog
from google.protobuf import timestamp_pb2
from ..core.config import Settings
from ..drivers.falkordb_driver import FalkorDBDriver
from ..interfaces.graph_store import ConnectionConfig
from ..llm.providers.openai_provider import OpenAIProvider
from .generated import mcp_pb2, mcp_pb2_grpc
logger = structlog.get_logger(__name__)
class MnemosyneMCPService(mcp_pb2_grpc.MnemosyneMCPServicer):
"""
MnemosyneMCP 主服務實作
提供核心的搜索、分析和管理功能。
"""
def __init__(self, settings: Settings):
"""
初始化服務
Args:
settings: 應用配置
"""
self.settings = settings
self.logger = structlog.get_logger(f"{__name__}.{self.__class__.__name__}")
# 初始化 FalkorDB 驅動器
self.driver = FalkorDBDriver(
ConnectionConfig(
host=settings.database.host,
port=settings.database.port,
database=settings.database.database,
username=settings.database.username,
password=settings.database.password,
)
)
# 初始化 LLM Provider
self.llm_provider: Optional[OpenAIProvider] = None
# 服務統計
self.stats = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"search_requests": 0,
"impact_analysis_requests": 0,
}
async def initialize(self) -> None:
"""初始化服務組件"""
try:
# 連接資料庫
if not self.driver.is_connected:
await self.driver.connect()
self.logger.info("Database connected successfully")
# 初始化 LLM Provider
if not self.llm_provider:
# TODO: 從 Settings 中獲取 LLM 配置
llm_config = {
"model": "gpt-3.5-turbo",
"temperature": 0.7,
"max_tokens": 1000,
# 實際部署時需要設定 API_KEY
"api_key": (
self.settings.security.api_key
if hasattr(self.settings.security, "api_key")
else None
),
}
self.llm_provider = OpenAIProvider(llm_config)
await self.llm_provider.initialize()
self.logger.info("LLM Provider initialized successfully")
except Exception as e:
self.logger.error("Failed to initialize service", error=str(e))
raise
async def cleanup(self) -> None:
"""清理服務資源"""
try:
if self.driver and self.driver.is_connected:
await self.driver.disconnect()
self.logger.info("Database disconnected")
except Exception as e:
self.logger.error("Error during cleanup", error=str(e))
def HealthCheck(self, request, context):
"""系統健康檢查 - 純連線測試"""
self.stats["total_requests"] += 1
try:
# 純粹的健康檢查,不涉及業務邏輯
self.logger.debug("HealthCheck called")
# 創建回應時間戳
timestamp = timestamp_pb2.Timestamp()
timestamp.GetCurrentTime()
response = mcp_pb2.HealthCheckResponse(
status=mcp_pb2.HealthCheckResponse.Status.SERVING,
message="gRPC service is healthy",
timestamp=timestamp,
)
self.stats["successful_requests"] += 1
return response
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("HealthCheck failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Health check failed: {str(e)}")
# 返回不健康狀態
timestamp = timestamp_pb2.Timestamp()
timestamp.GetCurrentTime()
return mcp_pb2.HealthCheckResponse(
status=mcp_pb2.HealthCheckResponse.Status.NOT_SERVING,
message=f"Service unhealthy: {str(e)}",
timestamp=timestamp,
)
def IngestProject(self, request, context):
"""數據導入與管理 - 導入專案"""
self.stats["total_requests"] += 1
try:
self.logger.info("IngestProject called", project_id=request.project_id)
# TODO: 實作專案導入邏輯
# 這將在後續的 ECL 整合中實作
task_id = f"task_{uuid.uuid4()}"
self.stats["successful_requests"] += 1
return mcp_pb2.IngestProjectResponse(task_id=task_id)
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("IngestProject failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Internal error: {str(e)}")
return mcp_pb2.IngestProjectResponse()
def GetIngestStatus(self, request, context):
"""獲取導入狀態"""
self.stats["total_requests"] += 1
try:
self.logger.info("GetIngestStatus called", task_id=request.task_id)
# TODO: 實作狀態查詢邏輯
self.stats["successful_requests"] += 1
return mcp_pb2.IngestStatus(
task_id=request.task_id,
status=mcp_pb2.IngestStatus.Status.COMPLETED,
message="Task completed successfully",
)
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("GetIngestStatus failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Internal error: {str(e)}")
return mcp_pb2.IngestStatus()
def Search(self, request, context):
"""查詢與分析 - 搜索功能"""
self.stats["total_requests"] += 1
self.stats["search_requests"] += 1
try:
self.logger.info(
"Search called", query=request.query_text, top_k=request.top_k
)
# TODO: 實作搜索邏輯
# 這將在主線3中實作混合檢索功能
# 暫時返回空結果
response = mcp_pb2.SearchResponse(
summary="Search functionality is under development",
relevant_nodes=[],
subgraph=mcp_pb2.Graph(nodes=[], edges=[], metadata={}),
suggested_next_step="Please wait for implementation completion",
)
self.stats["successful_requests"] += 1
return response
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("Search failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Internal error: {str(e)}")
return mcp_pb2.SearchResponse()
def RunImpactAnalysis(self, request, context):
"""運行影響力分析"""
self.stats["total_requests"] += 1
self.stats["impact_analysis_requests"] += 1
try:
self.logger.info(
"RunImpactAnalysis called",
project_id=request.project_id,
pr_number=request.pr_number,
)
# TODO: 實作影響力分析邏輯
# 這將在主線4中實作
# 暫時返回空結果
response = mcp_pb2.ImpactAnalysisResponse(
summary="Impact analysis functionality is under development",
risk_level=mcp_pb2.ImpactAnalysisResponse.RiskLevel.LOW,
impact_subgraph=mcp_pb2.Graph(nodes=[], edges=[], metadata={}),
)
self.stats["successful_requests"] += 1
return response
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("RunImpactAnalysis failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Internal error: {str(e)}")
return mcp_pb2.ImpactAnalysisResponse()
def ApplyConstraint(self, request, context):
"""應用約束(Sprint 4 功能)"""
self.stats["total_requests"] += 1
try:
self.logger.info("ApplyConstraint called")
# Sprint 4 功能,暫時返回未實作
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("ApplyConstraint will be implemented in Sprint 4")
return mcp_pb2.ApplyConstraintResponse()
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("ApplyConstraint failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Internal error: {str(e)}")
return mcp_pb2.ApplyConstraintResponse()
def AcquireLock(self, request, context):
"""獲取鎖定(Sprint 4 功能)"""
self.stats["total_requests"] += 1
try:
self.logger.info("AcquireLock called")
# Sprint 4 功能,暫時返回未實作
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("AcquireLock will be implemented in Sprint 4")
return mcp_pb2.AcquireLockResponse()
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("AcquireLock failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Internal error: {str(e)}")
return mcp_pb2.AcquireLockResponse()
def ReleaseLock(self, request, context):
"""釋放鎖定(Sprint 4 功能)"""
self.stats["total_requests"] += 1
try:
self.logger.info("ReleaseLock called")
# Sprint 4 功能,暫時返回未實作
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("ReleaseLock will be implemented in Sprint 4")
return mcp_pb2.ReleaseLockResponse()
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("ReleaseLock failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Internal error: {str(e)}")
return mcp_pb2.ReleaseLockResponse()
def GetGraphVisualization(self, request, context):
"""獲取圖形可視化"""
self.stats["total_requests"] += 1
try:
self.logger.info(
"GetGraphVisualization called", project_id=request.project_id
)
# TODO: 實作圖形可視化邏輯
response = mcp_pb2.GraphVisualization(
graph=mcp_pb2.Graph(nodes=[], edges=[], metadata={}),
layout_data="{}",
)
self.stats["successful_requests"] += 1
return response
except Exception as e:
self.stats["failed_requests"] += 1
self.logger.error("GetGraphVisualization failed", error=str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Internal error: {str(e)}")
return mcp_pb2.GraphVisualization()
def get_service_stats(self) -> Dict[str, Any]:
"""獲取服務統計信息"""
return {
**self.stats,
"success_rate": (
self.stats["successful_requests"]
/ max(self.stats["total_requests"], 1)
* 100
),
}