Skip to main content
Glama

Database MCP Server

by kunwarmahen
mcp_chatbot.py•8.1 kB
import asyncio import nest_asyncio import json from typing import List from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client import ollama nest_asyncio.apply() class MCP_ChatBot: def __init__(self, model="qwen2.5:14b", max_retries=3): self.session: ClientSession = None self.model = model self.available_tools: List[dict] = [] self.max_retries = max_retries self.messages: List[dict] = [] # conversation memory async def build_system_prompt(self): if not self.available_tools: return "You are a helpful assistant." tools_desc = "\n".join( [f"- {tool['name']}: {tool['description']}" for tool in self.available_tools] ) return ( "You are a helpful assistant connected to external tools via MCP.\n\n" f"TOOLS YOU CAN USE:\n{tools_desc}\n\n" "If the user request matches a tool, respond ONLY with JSON objects in the form:\n" '{"tool": "<tool_name>", "arguments": {"arg1": "value", ...}}\n' "You can return multiple tool calls in a list for multi-step tasks.\n" "Do not add extra text outside JSON unless it is plain explanation for the user." ) async def call_tool(self, tool_name: str, arguments: dict): try: response = await self.session.call_tool(tool_name, arguments) # return response.content[0].text return response.content except Exception as e: return f"[Tool Error] {str(e)}" def prepare_tool_arguments(self, tool_name: str, user_args: dict) -> dict: tool_info = next((t for t in self.available_tools if t["name"] == tool_name), None) if not tool_info: return user_args expected_fields = tool_info.get("input_schema", {}).get("properties", {}).keys() prepared_args = {} for field in expected_fields: for k, v in user_args.items(): if k.lower() == field.lower() or (k.lower() == "username" and field.lower() == "name"): prepared_args[field] = v if field not in prepared_args: print(f"⚠ Missing expected argument '{field}' for tool '{tool_name}'") return prepared_args def split_json_and_text(self, text: str): stack = [] json_end_idx = None for i, c in enumerate(text): if c == "{": stack.append("{") elif c == "}": if stack: stack.pop() if not stack: json_end_idx = i + 1 break if json_end_idx: json_part = text[:json_end_idx].strip() remaining_text = text[json_end_idx:].strip() or None try: tool_call = json.loads(json_part) except json.JSONDecodeError: tool_call = None return tool_call, remaining_text return None, text def extract_tools_from_text(self, text: str): text = text.strip() tools = [] remaining_text = None # Try JSON array if text.startswith("[") and text.endswith("]"): try: tools = json.loads(text) return tools, None except json.JSONDecodeError: pass # Fall back to single JSON object tool, remaining_text = self.split_json_and_text(text) if tool: tools.append(tool) return tools, remaining_text async def process_query(self, query: str): self.messages.append({"role": "user", "content": query}) while True: retries = 0 while retries < self.max_retries: print(f"Try number : {retries+1}") try: response = ollama.chat( model=self.model, messages=self.messages, options={"num_predict": 1024} ) assistant_output = response["message"]["content"].strip() print(f"\nšŸ¤– Raw model output: {assistant_output}") tool_calls, remaining_text = self.extract_tools_from_text(assistant_output) if remaining_text: print(f"šŸ’¬ Assistant: {remaining_text}") self.messages.append({"role": "assistant", "content": remaining_text}) if not tool_calls: return assistant_output for tool_call in tool_calls: if "tool" not in tool_call or "arguments" not in tool_call: continue tool_name = tool_call["tool"] user_args = tool_call["arguments"] validated_args = self.prepare_tool_arguments(tool_name, user_args) if tool_name not in [t["name"] for t in self.available_tools]: print(f"āŒ Unknown tool: {tool_name}, skipping.") continue try: print(f"āš™ļø Calling tool `{tool_name}` with arguments {validated_args} ...") tool_result = await self.call_tool(tool_name, validated_args) print(f"šŸ›  Tool result: {tool_result}") self.messages.append({"role": "assistant", "content": json.dumps(tool_call)}) self.messages.append({"role": "user", "content": f"Tool result: {tool_result}"}) except Exception as e: print(f"āŒ Tool execution failed: {str(e)}. Retrying...") retries += 1 await asyncio.sleep(1) break else: break # all tools succeeded except json.JSONDecodeError as e: print(f"⚠ JSON parse error: {str(e)}. Retrying...") retries += 1 await asyncio.sleep(1) else: print("āŒ Max retries reached. Returning raw output.") self.messages.append({"role": "assistant", "content": assistant_output}) return assistant_output async def chat_loop(self): print("\nMCP Chatbot Started (Ollama/Qwen2.5:14b)!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == "quit": break await self.process_query(query) print("\n") except Exception as e: print(f"\nError: {str(e)}") async def connect_to_server_and_run(self): server_params = StdioServerParameters( command="uv", args=["run", "main.py"], env=None, ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: self.session = session await session.initialize() response = await session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) self.available_tools = [ {"name": tool.name, "description": tool.description, "input_schema": tool.inputSchema} for tool in response.tools ] # System prompt system_prompt = await self.build_system_prompt() self.messages.append({"role": "system", "content": system_prompt}) await self.chat_loop() async def main(): chatbot = MCP_ChatBot() await chatbot.connect_to_server_and_run() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kunwarmahen/db-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server