Skip to main content
Glama
workflow.py8.2 kB
""" MCP Agent 워크플로우 LangGraph를 사용하여 ReAct 에이전트를 구성하고 MCP 도구를 통합합니다. """ import logging from typing import Dict, Any, List, Optional from langgraph.prebuilt import create_react_agent from langchain_openai import AzureChatOpenAI from langchain.tools import BaseTool from langchain.schema import BaseMessage, HumanMessage, AIMessage from langchain_mcp_adapters.client import MultiServerMCPClient from mcp_client.config import config logger = logging.getLogger(__name__) class MCPAgentWorkflow: """ MCP 도구를 사용하는 LangGraph ReAct 에이전트 워크플로우 Azure OpenAI 모델과 MCP 도구를 통합하여 사용자 요청을 처리합니다. """ def __init__( self, mcp_client: MultiServerMCPClient, model_name: Optional[str] = None, temperature: Optional[float] = None ): """ MCPAgentWorkflow 초기화 Args: mcp_client: MultiServerMCPClient 인스턴스 model_name: Azure OpenAI 모델 이름 (기본값: config에서 가져옴) temperature: 모델 온도 (기본값: config에서 가져옴) """ self.mcp_client = mcp_client self.model_name = model_name or config.AZURE_OPENAI_DEPLOYMENT self.temperature = temperature or config.OPENAI_TEMPERATURE # Azure OpenAI 모델 초기화 self.model = self._create_model() # 워크플로우 구성 self.graph = None # 초기화 시점에는 None, 실행 시점에 생성 logger.info( "MCPAgentWorkflow 초기화됨", model=self.model_name, temperature=self.temperature ) def _create_model(self) -> AzureChatOpenAI: """Azure OpenAI 모델 생성""" return AzureChatOpenAI( azure_deployment=self.model_name, azure_endpoint=config.AZURE_OPENAI_ENDPOINT, api_key=config.AZURE_OPENAI_API_KEY, api_version=config.AZURE_OPENAI_API_VERSION, temperature=self.temperature, max_tokens=4000 ) def _build_workflow(self, tools: List[BaseTool]): """도구 목록으로 워크플로우 구성""" try: logger.info(f"{len(tools)}개의 도구로 워크플로우 구성 중...") # create_react_agent가 자체적으로 StateGraph를 반환 graph = create_react_agent(model=self.model, tools=tools) logger.info("LangGraph 워크플로우 구성 완료") return graph except Exception as e: logger.error(f"워크플로우 구성 실패: {e}") raise def _modify_state(self, state: Dict[str, Any]) -> Dict[str, Any]: """ 상태 수정 함수 LangGraph의 상태를 ReAct 에이전트에 맞게 조정합니다. Args: state: 현재 상태 Returns: 수정된 상태 """ # 메시지가 없는 경우 빈 리스트로 초기화 if "messages" not in state: state["messages"] = [] # next 필드가 없는 경우 "agent"로 설정 if "next" not in state: state["next"] = "agent" return state async def invoke(self, query: str, thread_id: Optional[str] = None, **kwargs) -> Dict[str, Any]: try: logger.info("워크플로우 실행 시작", query=query, thread_id=thread_id) # 실행 시점에 실제 도구 목록 가져오기 tools = await self.mcp_client.get_tools() logger.info(f"실행 시점에 {len(tools)}개의 도구를 가져왔습니다") # 워크플로우 구성 self.graph = self._build_workflow(tools) messages = [HumanMessage(content=query)] result = await self.graph.ainvoke({"messages": messages}, config={"configurable": {"thread_id": thread_id}} if thread_id else {}) response = self._process_result(result) logger.info("워크플로우 실행 완료", response_length=len(response.get("messages", []))) return response except Exception as e: logger.error(f"워크플로우 실행 실패: {e}", query=query) raise def _process_result(self, result: Dict[str, Any]) -> Dict[str, Any]: """ 워크플로우 실행 결과 처리 Args: result: 원시 실행 결과 Returns: 처리된 결과 """ try: messages = result.get("messages", []) # 사용된 도구 정보 추출 used_tools = self._extract_used_tools(messages) # 최종 응답 추출 final_response = self._extract_final_response(messages) return { "response": final_response, "used_tools": used_tools, "messages": messages, "status": "success" } except Exception as e: logger.error("결과 처리 실패", error=str(e)) return { "response": "결과 처리 중 오류가 발생했습니다", "error": str(e), "status": "error" } def _extract_used_tools(self, messages: List[BaseMessage]) -> List[Dict[str, Any]]: """ 메시지에서 사용된 도구 정보 추출 Args: messages: 메시지 리스트 Returns: 사용된 도구 정보 리스트 """ used_tools = [] for i, message in enumerate(messages): if hasattr(message, 'tool_calls') and message.tool_calls: for tool_call in message.tool_calls: # 다음 메시지에서 도구 실행 결과 찾기 if i + 1 < len(messages) and hasattr(messages[i + 1], 'tool_call_id'): tool_result = messages[i + 1].content used_tools.append({ "name": tool_call.get("name", "unknown"), "arguments": tool_call.get("args", {}), "result": tool_result }) return used_tools def _extract_final_response(self, messages: List[BaseMessage]) -> str: """ 최종 응답 메시지 추출 Args: messages: 메시지 리스트 Returns: 최종 응답 텍스트 """ # 마지막 AI 메시지 찾기 for message in reversed(messages): if isinstance(message, AIMessage) and message.content: return message.content return "응답을 생성할 수 없습니다" async def health_check(self) -> Dict[str, Any]: """워크플로우 상태 확인""" try: # MultiServerMCPClient에서 직접 도구 가져오기 tools = await self.mcp_client.get_tools() tools_count = len(tools) return { "workflow": "ready" if self.graph else "not_initialized", "model": self.model_name, "mcp_client": { "status": "connected", "tool_count": tools_count, "client_type": "MultiServerMCPClient" }, "tools_available": tools_count } except Exception as e: return {"workflow": "error", "error": str(e)} def get_workflow_info(self) -> Dict[str, Any]: """워크플로우 정보 반환""" return { "model": self.model_name, "temperature": self.temperature, "nodes": list(self.graph.nodes.keys()) if hasattr(self.graph, 'nodes') else [], "checkpointer": "MemorySaver" }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/J-nowcow/github-MCP-practice'

If you have feedback or need assistance with the MCP directory API, please join our Discord server