"""MCP 클라이언트"""
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import Optional
class MCPClient:
"""MCP 서버와 통신하는 클라이언트"""
def __init__(self):
self.session: Optional[ClientSession] = None
self.read_stream = None
self.write_stream = None
self._context_manager = None
async def connect(self):
"""MCP 서버에 연결"""
server_params = StdioServerParameters(
command="python",
args=["-m", "src.mcp_server"],
env=None
)
# stdio_client는 async context manager이므로 async with 사용
self._context_manager = stdio_client(server_params)
self.read_stream, self.write_stream = await self._context_manager.__aenter__()
self.session = ClientSession(self.read_stream, self.write_stream)
await self.session.__aenter__()
# 초기화
await self.session.initialize()
# 사용 가능한 도구 목록 가져오기
response = await self.session.list_tools()
print(f"사용 가능한 도구: {[tool.name for tool in response.tools]}")
async def call_tool(self, tool_name: str, arguments: dict) -> str:
"""
MCP 도구 호출
Args:
tool_name: 도구 이름
arguments: 도구 인자
Returns:
도구 실행 결과 텍스트
"""
if not self.session:
raise RuntimeError("MCP 서버에 연결되지 않았습니다. connect()를 먼저 호출하세요.")
result = await self.session.call_tool(tool_name, arguments=arguments)
# 결과 텍스트 추출
if result.content and len(result.content) > 0:
return result.content[0].text
return "결과를 찾을 수 없습니다."
async def search_official_docs(self, query: str) -> dict:
"""공식 문서 검색 (편의 메서드)"""
result_text = await self.call_tool("search_official_docs", {"query": query})
# JSON 문자열을 딕셔너리로 파싱
try:
return json.loads(result_text)
except json.JSONDecodeError:
# JSON 파싱 실패 시 오류 메시지 반환
raise ValueError(f"JSON 파싱 실패: {result_text}")
async def save_document(self, title: str, url: str, content: str) -> str:
"""문서를 벡터 DB에 저장 (편의 메서드)"""
return await self.call_tool("save_document", {
"title": title,
"url": url,
"content": content
})
async def get_document(self, vector_id: str) -> str:
"""vector_id로 문서 조회 (편의 메서드)"""
return await self.call_tool("get_document", {"vector_id": vector_id})
async def qa_with_redis(self, vector_id: str, query: str) -> str:
"""vector_id의 문서를 기반으로 질문 답변 (편의 메서드)"""
return await self.call_tool("qa_with_redis", {
"vector_id": vector_id,
"query": query
})
async def disconnect(self):
"""MCP 서버 연결 해제"""
if self.session:
await self.session.__aexit__(None, None, None)
self.session = None
if self._context_manager:
await self._context_manager.__aexit__(None, None, None)
self._context_manager = None
# 전역 MCP 클라이언트 인스턴스
mcp_client = MCPClient()