MCP2Lambda
Official
by danilop
- mcp_client_bedrock
import json
import re
import boto3
class ConverseAgent:
def __init__(self, model_id, region='us-west-2', system_prompt='You are a helpful assistant.'):
self.model_id = model_id
self.region = region
self.client = boto3.client('bedrock-runtime', region_name=self.region)
self.system_prompt = system_prompt
self.messages = []
self.tools = None
self.response_output_tags = [] # ['<response>', '</response>']
async def invoke_with_prompt(self, prompt):
content = [
{
'text': prompt
}
]
return await self.invoke(content)
async def invoke(self, content):
print(f"User: {json.dumps(content, indent=2)}")
self.messages.append(
{
"role": "user",
"content": content
}
)
response = self._get_converse_response()
print(f"Agent: {json.dumps(response, indent=2)}")
return await self._handle_response(response)
def _get_converse_response(self):
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse.html
"""
# print(f"Invoking with messages: {json.dumps(self.messages, indent=2)}")
response = self.client.converse(
modelId=self.model_id,
messages=self.messages,
system=[
{
"text": self.system_prompt
}
],
inferenceConfig={
"maxTokens": 4096,
"temperature": 0.7,
},
toolConfig=self.tools.get_tools()
)
return(response)
async def _handle_response(self, response):
# Add the response to the conversation history
self.messages.append(response['output']['message'])
# Do we need to do anything else?
stop_reason = response['stopReason']
if stop_reason in ['end_turn', 'stop_sequence']:
# Safely extract the text from the nested response structure
try:
message = response.get('output', {}).get('message', {})
content = message.get('content', [])
text = content[0].get('text', '')
if hasattr(self, 'response_output_tags') and len(self.response_output_tags) == 2:
pattern = f"(?s).*{re.escape(self.response_output_tags[0])}(.*?){re.escape(self.response_output_tags[1])}"
match = re.search(pattern, text)
if match:
return match.group(1)
return text
except (KeyError, IndexError):
return ''
elif stop_reason == 'tool_use':
try:
# Extract tool use details from response
tool_response = []
for content_item in response['output']['message']['content']:
if 'toolUse' in content_item:
tool_request = {
"toolUseId": content_item['toolUse']['toolUseId'],
"name": content_item['toolUse']['name'],
"input": content_item['toolUse']['input']
}
tool_result = await self.tools.execute_tool(tool_request)
tool_response.append({'toolResult': tool_result})
return await self.invoke(tool_response)
except KeyError as e:
raise ValueError(f"Missing required tool use field: {e}")
except Exception as e:
raise ValueError(f"Failed to execute tool: {e}")
elif stop_reason == 'max_tokens':
# Hit token limit (this is one way to handle it.)
await self.invoke_with_prompt('Please continue.')
else:
raise ValueError(f"Unknown stop reason: {stop_reason}")