import json
import logging
import sys
import os
import utils
import boto3
import re
import chat
# Memory 기능 비활성화
os.environ['STRANDS_DISABLE_MEMORY'] = 'true'
os.environ['STRANDS_MEMORY_ENABLED'] = 'false'
os.environ['BEDROCK_AGENTCORE_DISABLED'] = 'true'
from typing import Dict, List, Optional
from strands import Agent
from strands.models import BedrockModel
from botocore.config import Config
# from strands_tools import memory, retrieve # Memory 기능 비활성화
from strands.agent.conversation_manager import SlidingWindowConversationManager
from strands.tools.mcp import MCPClient
from mcp import stdio_client, StdioServerParameters
logging.basicConfig(
level=logging.INFO, # Default to INFO level
format='%(filename)s:%(lineno)d | %(message)s',
handlers=[
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger("agent")
index = 0
def add_notification(containers, message):
global index
if containers is not None:
containers['notification'][index].info(message)
index += 1
def add_response(containers, message):
global index
containers['notification'][index].markdown(message)
index += 1
status_msg = []
def get_status_msg(status):
global status_msg
status_msg.append(status)
if status != "end)":
status = " -> ".join(status_msg)
return "[status]\n" + status + "..."
else:
status = " -> ".join(status_msg)
return "[status]\n" + status
model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
aws_region = utils.bedrock_region
def get_model():
STOP_SEQUENCE = "\n\nHuman:"
maxOutputTokens = 4096 # 4k
# Bedrock client configuration
bedrock_config = Config(
read_timeout=900,
connect_timeout=900,
retries=dict(max_attempts=3, mode="adaptive"),
)
bedrock_client = boto3.client(
'bedrock-runtime',
region_name=aws_region,
config=bedrock_config
)
model = BedrockModel(
client=bedrock_client,
model_id=chat.model_id,
max_tokens=maxOutputTokens,
stop_sequences = [STOP_SEQUENCE],
temperature = 0.1,
top_p = 0.9,
additional_request_fields={
"thinking": {
"type": "disabled"
}
}
)
return model
def load_mcp_config():
config = None
script_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(script_dir, "mcp.json")
# Debug: print the actual path being used
logger.info(f"script_dir: {script_dir}")
logger.info(f"config_path: {config_path}")
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
return config
def isKorean(text):
# check korean
pattern_hangul = re.compile('[\u3131-\u3163\uac00-\ud7a3]+')
word_kor = pattern_hangul.search(str(text))
# print('word_kor: ', word_kor)
if word_kor and word_kor != 'None':
# logger.info(f"Korean: {word_kor}")
return True
else:
# logger.info(f"Not Korean:: {word_kor}")
return False
# Global variables - 매번 새로 생성하여 tool_use/tool_result 매칭 문제 방지
agent = None
mcp_client = None
# initialize_agent 함수 제거 - run_agent에서 직접 처리
def create_filtered_mcp_tools(client):
"""Create MCP tools with parameter filtering"""
original_tools = client.list_tools_sync()
filtered_tools = []
for tool in original_tools:
if hasattr(tool, 'tool') and hasattr(tool.tool, 'name'):
# Create a wrapper that filters parameters
original_call = tool.call_async
async def filtered_call(tool_use, invocation_state):
# Filter out problematic parameters
if hasattr(tool_use, 'input') and isinstance(tool_use.input, dict):
filtered_input = filter_mcp_parameters(tool.tool.name, tool_use.input)
# Create a new tool_use with filtered input
tool_use.input = filtered_input
return await original_call(tool_use, invocation_state)
# Replace the call method
tool.call_async = filtered_call
filtered_tools.append(tool)
else:
filtered_tools.append(tool)
return filtered_tools
def get_tool_info(tool_name, tool_content):
tool_references = []
urls = []
content = ""
try:
if isinstance(tool_content, dict):
json_data = tool_content
elif isinstance(tool_content, list):
json_data = tool_content
else:
json_data = json.loads(tool_content)
logger.info(f"json_data: {json_data}")
# json_data가 리스트나 딕셔너리가 아니면 빈 결과 반환
if not isinstance(json_data, (list, dict)):
return content, urls, tool_references
if isinstance(json_data, dict) and "path" in json_data: # path
path = json_data["path"]
if isinstance(path, list):
for url in path:
urls.append(url)
else:
urls.append(path)
# json_data가 리스트인 경우에만 반복
if isinstance(json_data, list):
for item in json_data:
logger.info(f"item: {item}")
if isinstance(item, dict) and "reference" in item and "contents" in item:
url = item["reference"]["url"]
title = item["reference"]["title"]
content_text = item["contents"][:200] + "..." if len(item["contents"]) > 200 else item["contents"]
content_text = content_text.replace("\n", "")
tool_references.append({
"url": url,
"title": title,
"content": content_text
})
logger.info(f"tool_references: {tool_references}")
except (json.JSONDecodeError, TypeError, KeyError) as e:
logger.info(f"get_tool_info error: {e}")
pass
return content, urls, tool_references
def get_reference(references):
ref = ""
if references:
ref = "\n\n### Reference\n"
for i, reference in enumerate(references):
ref += f"{i+1}. [{reference['title']}]({reference['url']}), {reference['content']}...\n"
return ref
def filter_mcp_parameters(tool_name, input_params):
"""Filter out unexpected parameters for MCP tools"""
if not isinstance(input_params, dict):
return input_params
# Known problematic parameters that should be filtered out
problematic_params = ['mcp-session-id', 'session-id', 'session_id']
filtered_params = {}
for key, value in input_params.items():
if key not in problematic_params:
filtered_params[key] = value
else:
logger.info(f"Filtered out problematic parameter '{key}' for tool '{tool_name}'")
return filtered_params
async def show_streams(agent_stream, containers):
tool_name = ""
result = ""
current_response = ""
references = []
progress_step = 20
async for event in agent_stream:
# logger.info(f"event: {event}")
if "message" in event:
message = event["message"]
logger.info(f"message: {message}")
for content in message["content"]:
logger.info(f"content: {content}")
if "text" in content:
logger.info(f"text: {content['text']}")
if containers is not None:
add_response(containers, content['text'])
result = content['text']
current_response = ""
if "toolUse" in content:
tool_use = content["toolUse"]
logger.info(f"tool_use: {tool_use}")
tool_name = tool_use["name"]
input_params = tool_use["input"]
# Filter out problematic parameters
filtered_input = filter_mcp_parameters(tool_name, input_params)
logger.info(f"tool_name: {tool_name}, original_arg: {input_params}, filtered_arg: {filtered_input}")
if containers is not None:
add_notification(containers, f"tool name: {tool_name}, arg: {filtered_input}")
containers['status'].info(get_status_msg(f"{tool_name}"))
# 진행률 업데이트
if 'progress' in containers:
progress_step += 15
containers['progress'].progress(min(progress_step, 90))
# Agent 상태 업데이트
if 'agent_status' in containers and tool_name == 'create_comprehensive_trip_report':
containers['status'].info("🤖 6개 Agent 병렬 실행 중...")
refs = []
if "toolResult" in content:
tool_result = content["toolResult"]
logger.info(f"tool_name: {tool_name}")
logger.info(f"tool_result: {tool_result}")
if "content" in tool_result:
tool_content = tool_result['content']
for content in tool_content:
if "text" in content:
if containers is not None:
add_notification(containers, f"tool result: {content['text']}")
content, urls, refs = get_tool_info(tool_name, content['text'])
logger.info(f"content: {content}")
logger.info(f"urls: {urls}")
logger.info(f"refs: {refs}")
if refs:
for r in refs:
references.append(r)
logger.info(f"refs: {refs}")
if "data" in event:
text_data = event["data"]
current_response += text_data
if containers is not None:
containers["notification"][index].markdown(current_response)
continue
# get reference
# result += get_reference(references)
return result
def get_tool_list(tools):
tool_list = []
for tool in tools:
if hasattr(tool, 'tool_name'): # MCP tool
tool_list.append(tool.tool_name)
elif hasattr(tool, 'name'): # MCP tool with name attribute
tool_list.append(tool.name)
elif hasattr(tool, '__name__'): # Function or module
tool_list.append(tool.__name__)
elif str(tool).startswith("<module 'strands_tools."):
module_name = str(tool).split("'")[1].split('.')[-1]
tool_list.append(module_name)
else:
# For MCP tools that might have different structure
tool_str = str(tool)
if 'MCPAgentTool' in tool_str:
# Try to extract tool name from MCP tool
try:
if hasattr(tool, 'tool'):
tool_list.append(tool.tool.name)
else:
tool_list.append(f"MCP_Tool_{len(tool_list)}")
except:
tool_list.append(f"MCP_Tool_{len(tool_list)}")
else:
tool_list.append(str(tool))
return tool_list
def create_mcp_client(mcp_server_name: str):
config = load_mcp_config()
mcp_servers = config["mcpServers"]
mcp_client = None
for server_name, server_config in mcp_servers.items():
logger.info(f"server_name: {server_name}")
logger.info(f"server_config: {server_config}")
env = server_config["env"] if "env" in server_config else None
if server_name == mcp_server_name:
mcp_client = MCPClient(lambda: stdio_client(
StdioServerParameters(
command=server_config["command"],
args=server_config["args"],
env=env
)
))
break
return mcp_client
tool_list = None
async def run_agent(query: str, containers):
global index, status_msg
index = 0
status_msg = []
containers['status'].info(get_status_msg(f"(start"))
try:
# MCP 클라이언트를 사용하여 agent 실행
knowledge_base_client = create_mcp_client("knowledge_base")
business_trip_client = create_mcp_client("business_trip")
notion_client = create_mcp_client("notion")
pdf_client = create_mcp_client("pdf_generator")
if not all([knowledge_base_client, business_trip_client, notion_client, pdf_client]):
raise Exception("MCP 클라이언트 생성 실패")
except Exception as e:
logger.error(f"MCP 클라이언트 초기화 오류: {e}")
containers['status'].error(f"MCP 서버 연결 실패: {e}")
return "MCP 서버 연결에 실패했습니다. 환경 설정을 확인해주세요."
try:
with knowledge_base_client, business_trip_client, notion_client, pdf_client:
# Get tools from all MCP servers
kb_tools = knowledge_base_client.list_tools_sync()
bt_tools = business_trip_client.list_tools_sync()
notion_tools = notion_client.list_tools_sync()
pdf_tools = pdf_client.list_tools_sync()
tools = []
tools.extend(kb_tools)
tools.extend(bt_tools)
tools.extend(notion_tools)
tools.extend(pdf_tools)
tool_list = get_tool_list(tools)
if chat.debug_mode and containers is not None and tool_list:
containers['tools'].info(f"tool_list: {tool_list}")
# 매번 새로운 conversation_manager 생성하여 tool_use/tool_result 매칭 문제 방지
fresh_conversation_manager = SlidingWindowConversationManager(
window_size=5, # 윈도우 크기 줄여서 메모리 이슈 방지
)
system_prompt = (
"당신의 이름은 현민이고, 질문에 대해 친절하게 답변하는 사려깊은 인공지능 도우미입니다."
"상황에 맞는 구체적인 세부 정보를 충분히 제공합니다."
"모르는 질문을 받으면 솔직히 모른다고 말합니다."
"\n\n출장 준비와 관련된 질문을 받으면, 다음 순서로 진행합니다:"
"1. business_trip 도구들로 종합 가이드 생성:"
" - get_destination_weather: 목적지 날씨 정보"
" - get_destination_info: 국가 기본 정보"
" - analyze_business_culture: 비즈니스 문화 분석"
" - get_packing_recommendations: 짐 추천"
"2. 종합 리포트 생성 후 반드시 add_to_notion_page로 자동 저장"
"3. Notion 저장 후 전체 내용 표시"
"\n'pdf 만들어주세요' 요청 시 generate_business_trip_pdf 도구를 사용하여 "
"Summary, Basic Details, Approval Request for Overseas Business Travel, Key Details 섹션이 포함된 "
"공식 출장 계획서 PDF를 생성하세요."
"\n출장 질문에는 사용자에게 묻지 말고 자동으로 Notion에 저장하세요."
)
model = get_model()
agent = Agent(
model=model,
system_prompt=system_prompt,
tools=tools,
conversation_manager=fresh_conversation_manager
)
agent_stream = agent.stream_async(query)
result = await show_streams(agent_stream, containers)
logger.info(f"result: {result}")
containers['status'].info(get_status_msg(f"end)"))
return result
except Exception as e:
logger.error(f"Agent 실행 오류: {e}")
containers['status'].error(f"Agent 실행 실패: {e}")
return f"오류가 발생했습니다: {e}"