Skip to main content
Glama
mcp_client.py9.84 kB
""" MCP Client 관리자 langchain-mcp-adapters의 MultiServerMCPClient를 래핑하여 사용하기 쉽게 만듭니다. """ import asyncio import logging import uuid from typing import Dict, Any, Optional, List from contextlib import asynccontextmanager from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from langchain_mcp_adapters.client import MultiServerMCPClient from langchain.tools import BaseTool from mcp_client.config import config logger = logging.getLogger(__name__) class MCPClientManager: """ MCP 서버 연결을 관리하는 클래스 langchain-mcp-adapters의 MultiServerMCPClient를 래핑하여 에러 처리, 재시도 로직, 로깅을 추가합니다. """ def __init__(self, mcp_config: Optional[Dict[str, Any]] = None): """ MCPClientManager 초기화 Args: mcp_config: MCP 서버 설정. None인 경우 기본 설정 사용 """ self.mcp_config = mcp_config or config.get_mcp_config() self.client: Optional[MultiServerMCPClient] = None self._is_connected = False self.session_id: Optional[str] = None self._session_headers: Dict[str, str] = {} self._connection_attempts = 0 self._max_connection_attempts = 3 logger.info("MCPClientManager 초기화됨", extra={"config": self.mcp_config}) async def __aenter__(self) -> 'MCPClientManager': """비동기 컨텍스트 매니저 진입""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """비동기 컨텍스트 매니저 종료""" await self.disconnect() def _generate_session_id(self) -> str: """세션 ID 생성""" if not self.session_id: self.session_id = str(uuid.uuid4()) return self.session_id def _get_session_headers(self) -> Dict[str, str]: """세션 헤더 반환""" session_id = self._generate_session_id() headers = { "X-Session-ID": session_id, "X-Request-ID": str(uuid.uuid4()), "Accept": "text/event-stream, application/json", "Content-Type": "application/json" } return headers @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=8), retry=retry_if_exception_type((ConnectionError, TimeoutError, Exception)) ) async def connect(self) -> None: """MCP 서버에 연결""" try: if self._connection_attempts >= self._max_connection_attempts: raise RuntimeError(f"최대 연결 시도 횟수({self._max_connection_attempts})를 초과했습니다") self._connection_attempts += 1 logger.info(f"MCP 서버에 연결 중... (시도 {self._connection_attempts}/{self._max_connection_attempts})") self._session_headers = self._get_session_headers() # MCP 설정에 세션 헤더 추가 mcp_config = self.mcp_config.copy() if "github" in mcp_config: # 기존 헤더와 세션 헤더 병합 existing_headers = mcp_config["github"].get("headers", {}) mcp_config["github"]["headers"] = {**existing_headers, **self._session_headers} self.client = MultiServerMCPClient(mcp_config) await self._test_connection() self._is_connected = True self._connection_attempts = 0 # 성공 시 카운터 리셋 logger.info(f"MCP 서버 연결 성공 (세션: {self.session_id})") except Exception as e: logger.error(f"MCP 서버 연결 실패 (시도 {self._connection_attempts}): {e}") self._is_connected = False raise async def disconnect(self) -> None: """MCP 서버 연결 해제""" if self.client and self._is_connected: try: # 세션 종료 if self.session_id: logger.info(f"세션 종료: {self.session_id}") self.session_id = None # langchain-mcp-adapters는 자동으로 리소스 정리 logger.info("MCP 서버 연결 해제됨") except Exception as e: logger.error(f"MCP 서버 연결 해제 중 오류: {e}") finally: self.client = None self._is_connected = False async def _test_connection(self) -> None: """연결 테스트""" if not self.client: raise RuntimeError("MCP 클라이언트가 초기화되지 않았습니다") try: # 단순히 클라이언트가 초기화되었는지만 확인 logger.info("MCP 클라이언트 초기화 확인됨") except Exception as e: logger.error(f"MCP 서버 연결 테스트 실패: {e}") raise @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=8) ) async def get_tools(self) -> List[BaseTool]: """ MCP 서버에서 사용 가능한 도구 목록을 가져옵니다 Returns: LangChain Tool 객체들의 리스트 Raises: RuntimeError: 클라이언트가 연결되지 않은 경우 Exception: MCP 서버 통신 오류 """ if not self.client or not self._is_connected: raise RuntimeError("MCP 클라이언트가 연결되지 않았습니다") try: tools = await self.client.get_tools() logger.info(f"{len(tools)}개의 MCP 도구를 LangChain 도구로 변환했습니다") return tools except Exception as e: # ExceptionGroup 처리 if hasattr(e, '__cause__') and e.__cause__: logger.error(f"MCP 도구 목록 가져오기 실패 (원인): {e.__cause__}") raise e.__cause__ if e.__cause__ else e else: logger.error(f"MCP 도구 목록 가져오기 실패: {e}") raise @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=8) ) async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any: """ MCP 도구를 실행합니다 Args: tool_name: 실행할 도구 이름 arguments: 도구 실행에 필요한 인수 Returns: 도구 실행 결과 Raises: RuntimeError: 클라이언트가 연결되지 않은 경우 Exception: MCP 서버 통신 오류 """ if not self.client or not self._is_connected: raise RuntimeError("MCP 클라이언트가 연결되지 않은 경우") try: logger.info(f"MCP 도구 실행: {tool_name}", extra={"arguments": arguments, "session": self.session_id}) # MultiServerMCPClient의 도구 실행 방식에 따라 구현 # 실제 구현은 langchain-mcp-adapters의 API에 따라 달라질 수 있음 # result = await self.client.call_tool(tool_name, arguments) # 임시로 더미 결과 반환 (실제 구현 시 수정 필요) result = {"status": "success", "message": f"도구 {tool_name} 실행됨"} logger.info(f"MCP 도구 실행 완료: {tool_name}", extra={"result": result}) return result except Exception as e: logger.error(f"MCP 도구 실행 실패: {tool_name}", extra={"error": str(e), "arguments": arguments}) raise @property def is_connected(self) -> bool: """연결 상태 반환""" return self._is_connected async def health_check(self) -> Dict[str, Any]: """MCP 서버 상태 확인""" try: if not self.is_connected: return {"status": "disconnected", "error": "클라이언트가 연결되지 않음"} tools = await self.get_tools() return { "status": "connected", "tool_count": len(tools), "server_url": self.mcp_config.get("github", {}).get("url"), "session_id": self.session_id, "headers": self._session_headers, "connection_attempts": self._connection_attempts } except Exception as e: return {"status": "error", "error": str(e)} async def refresh_session(self) -> None: """세션 새로고침""" try: old_session = self.session_id self.session_id = None self._session_headers = self._get_session_headers() logger.info(f"세션 새로고침: {old_session} -> {self.session_id}") except Exception as e: logger.error(f"세션 새로고침 실패: {e}") def reset_connection_attempts(self) -> None: """연결 시도 횟수 리셋""" self._connection_attempts = 0 # 편의를 위한 컨텍스트 매니저 함수 @asynccontextmanager async def get_mcp_client(mcp_config: Optional[Dict[str, Any]] = None): """ MCP 클라이언트를 컨텍스트 매니저로 제공하는 편의 함수 Usage: async with get_mcp_client() as client: tools = await client.get_tools() result = await client.call_tool("tool_name", {"arg": "value"}) """ async with MCPClientManager(mcp_config) as client: yield client

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/J-nowcow/github-MCP-practice'

If you have feedback or need assistance with the MCP directory API, please join our Discord server