mcp_chatbot.pyā¢8.01 kB
#!/usr/bin/env python3
import asyncio
import nest_asyncio
import json
from typing import List
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import ollama # pip install ollama
nest_asyncio.apply()
class MCP_ChatBot:
def __init__(self, model="qwen2.5:14b", max_retries=3):
self.session: ClientSession = None
self.model = model
self.available_tools: List[dict] = []
self.max_retries = max_retries
self.messages: List[dict] = []
async def build_system_prompt(self):
if not self.available_tools:
return "You are a helpful assistant."
tools_desc = "\n".join(
[f"- {tool['name']}: {tool['description']}" for tool in self.available_tools]
)
return (
"You are a helpful assistant connected to external tools via MCP.\n\n"
f"TOOLS YOU CAN USE:\n{tools_desc}\n\n"
"If the user request matches a tool, respond ONLY with JSON objects in the form:\n"
'{"tool": "<tool_name>", "arguments": {"arg1": "value", ...}}\n'
"You can return multiple tool calls in a list for multi-step tasks.\n"
"Do not add extra text outside JSON unless it is plain explanation for the user."
)
async def call_tool(self, tool_name: str, arguments: dict):
try:
response = await self.session.call_tool(tool_name, arguments)
return response.content[0].text
except Exception as e:
return f"[Tool Error] {str(e)}"
def prepare_tool_arguments(self, tool_name: str, user_args: dict) -> dict:
tool_info = next((t for t in self.available_tools if t["name"] == tool_name), None)
if not tool_info:
return user_args
expected_fields = tool_info.get("input_schema", {}).get("properties", {}).keys()
prepared_args = {}
for field in expected_fields:
for k, v in user_args.items():
if k.lower() == field.lower():
prepared_args[field] = v
if field not in prepared_args:
print(f"ā Missing expected argument '{field}' for tool '{tool_name}'")
return prepared_args
def split_json_and_text(self, text: str):
stack = []
json_end_idx = None
for i, c in enumerate(text):
if c == "{":
stack.append("{")
elif c == "}":
if stack:
stack.pop()
if not stack:
json_end_idx = i + 1
break
if json_end_idx:
json_part = text[:json_end_idx].strip()
remaining_text = text[json_end_idx:].strip() or None
try:
tool_call = json.loads(json_part)
except json.JSONDecodeError:
tool_call = None
return tool_call, remaining_text
return None, text
def extract_tools_from_text(self, text: str):
text = text.strip()
tools = []
remaining_text = None
# Try JSON array
if text.startswith("[") and text.endswith("]"):
try:
tools = json.loads(text)
return tools, None
except json.JSONDecodeError:
pass
# Fall back to single JSON object
tool, remaining_text = self.split_json_and_text(text)
if tool:
tools.append(tool)
return tools, remaining_text
async def process_query(self, query: str):
self.messages.append({"role": "user", "content": query})
while True:
retries = 0
while retries < self.max_retries:
print(f"Try number : {retries+1}")
try:
response = ollama.chat(
model=self.model,
messages=self.messages,
options={"num_predict": 1024}
)
assistant_output = response["message"]["content"].strip()
print(f"\nš¤ Raw model output: {assistant_output}")
tool_calls, remaining_text = self.extract_tools_from_text(assistant_output)
if remaining_text:
print(f"š¬ Assistant: {remaining_text}")
self.messages.append({"role": "assistant", "content": remaining_text})
if not tool_calls:
return assistant_output
for tool_call in tool_calls:
if "tool" not in tool_call or "arguments" not in tool_call:
continue
tool_name = tool_call["tool"]
user_args = tool_call["arguments"]
validated_args = self.prepare_tool_arguments(tool_name, user_args)
if tool_name not in [t["name"] for t in self.available_tools]:
print(f"ā Unknown tool: {tool_name}, skipping.")
continue
try:
print(f"āļø Calling tool `{tool_name}` with arguments {validated_args} ...")
tool_result = await self.call_tool(tool_name, validated_args)
print(f"š Tool result: {tool_result}")
self.messages.append({"role": "assistant", "content": json.dumps(tool_call)})
self.messages.append({"role": "user", "content": f"Tool result: {tool_result}"})
except Exception as e:
print(f"ā Tool execution failed: {str(e)}. Retrying...")
retries += 1
await asyncio.sleep(1)
break
else:
break
except json.JSONDecodeError as e:
print(f"ā JSON parse error: {str(e)}. Retrying...")
retries += 1
await asyncio.sleep(1)
else:
print("ā Max retries reached. Returning raw output.")
self.messages.append({"role": "assistant", "content": assistant_output})
return assistant_output
async def chat_loop(self):
print("\nMCP Chatbot Started (Ollama/Qwen2.5:14b)!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == "quit":
break
await self.process_query(query)
print("\n")
except Exception as e:
print(f"\nError: {str(e)}")
async def connect_to_server_and_run(self):
server_params = StdioServerParameters(
command="uv",
args=["run", "main.py"], # š runs your Podman MCP server
env=None,
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session
await session.initialize()
response = await session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
self.available_tools = [
{"name": tool.name, "description": tool.description, "input_schema": tool.inputSchema}
for tool in response.tools
]
system_prompt = await self.build_system_prompt()
self.messages.append({"role": "system", "content": system_prompt})
await self.chat_loop()
async def main():
chatbot = MCP_ChatBot()
await chatbot.connect_to_server_and_run()
if __name__ == "__main__":
asyncio.run(main())