Skip to main content
Glama
digital-duck

Model Context Protocol Demo

by digital-duck
mcp_client_llm.py27.2 kB
import asyncio import json import logging import os from typing import Dict, List, Any, Optional from fastmcp import Client # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # LLM Integration Options - Choose one: # Option 1: OpenAI API # Option 2: Anthropic Claude API # Option 3: Local LLM via Ollama # Option 4: Google Gemini API # Option 5: AWS Bedrock Claude 3.5 LLM_MODELS = ["openai", "anthropic", "ollama", "gemini", "bedrock"] LLM_PROVIDER = "gemini" # --- LLM Query Parser --- class LLMQueryParser: """Use LLM to parse natural language queries into tool calls""" def __init__(self, provider: str = "ollama"): self.provider = provider self.setup_llm_client() def setup_llm_client(self): """Setup the appropriate LLM client based on provider""" if self.provider == "openai": try: import openai self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) logging.info("✅ OpenAI client initialized") except ImportError: logging.error("❌ OpenAI library not installed. Run: pip install openai") self.client = None elif self.provider == "anthropic": try: import anthropic self.client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) logging.info("✅ Anthropic client initialized") except ImportError: logging.error("❌ Anthropic library not installed. Run: pip install anthropic") self.client = None elif self.provider == "ollama": try: import requests # Test if Ollama is running response = requests.get("http://localhost:11434/api/tags", timeout=5) if response.status_code == 200: logging.info("✅ Ollama server detected") self.client = "ollama" # Use string indicator else: logging.error("❌ Ollama server not responding") self.client = None except Exception as e: logging.error(f"❌ Ollama connection failed: {e}") logging.error("Make sure Ollama is installed and running: https://ollama.ai") self.client = None elif self.provider == "gemini": try: import google.generativeai as genai api_key = os.getenv("GEMINI_API_KEY") if api_key: genai.configure(api_key=api_key) # https://ai.google.dev/gemini-api/docs/models gemini_models = [ "gemini-2.5-flash-preview-05-20", "gemini-2.5-pro-preview-05-06", "gemini-2.0-flash", "gemini-1.5-flash", "gemini-1.5-pro", ] model_name = gemini_models[3] self.client = genai.GenerativeModel(model_name) logging.info("✅ Gemini client initialized") else: logging.error("❌ GEMINI_API_KEY not set") self.client = None except ImportError: logging.error("❌ Gemini library not installed. Run: pip install google-generativeai") self.client = None elif self.provider == "bedrock": try: import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError # Initialize Bedrock client # AWS credentials should be configured via: # 1. AWS CLI: aws configure # 2. Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION # 3. IAM roles (if running on EC2) # 4. AWS SSO region = os.getenv("AWS_DEFAULT_REGION", "us-east-1") try: self.client = boto3.client( service_name='bedrock-runtime', region_name=region ) # Test connection with a simple call # Note: We'll test this during the first actual query to avoid unnecessary calls self.bedrock_model_id = "anthropic.claude-3-5-sonnet-20241022-v2:0" # Claude 3.5 Sonnet logging.info(f"✅ AWS Bedrock client initialized (region: {region})") logging.info(f"📋 Using model: {self.bedrock_model_id}") except (NoCredentialsError, PartialCredentialsError) as e: logging.error(f"❌ AWS credentials not configured: {e}") logging.error("Configure AWS credentials using one of these methods:") logging.error(" 1. AWS CLI: aws configure") logging.error(" 2. Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY") logging.error(" 3. IAM roles (if running on AWS)") self.client = None except ImportError: logging.error("❌ Boto3 library not installed. Run: pip install boto3") self.client = None def get_system_prompt(self, available_tools: List[Dict]) -> str: """Create system prompt with available tools""" tools_desc = "\n".join([ f"- {tool['name']}: {tool.get('description', 'No description')}" for tool in available_tools ]) return f"""You are a tool selection assistant. Given a user query, you must decide which tool to use and extract the necessary parameters. Available tools: {tools_desc} For each user query, respond with ONLY a JSON object in this exact format: {{ "tool": "tool_name", "params": {{ "param1": "value1", "param2": "value2" }}, "confidence": 0.95, "reasoning": "Brief explanation of why this tool was chosen" }} Tool-specific parameter requirements: - calculator: operation (add/subtract/multiply/divide), num1 (number), num2 (number) - trig: operation (sine/cosine/tangent/arc sine/arc cosine/arc tangent), num1 (float), unit(str) - stock_quote: ticker (stock symbol like AAPL, MSFT) - health: no parameters needed - echo: message (text to echo back) If you cannot determine which tool to use or extract parameters, respond with: {{ "tool": null, "params": {{}}, "confidence": 0.0, "reasoning": "Could not parse the query" }} Examples: User: "What is 15 plus 27?" Response: {{"tool": "calculator", "params": {{"operation": "add", "num1": 15, "num2": 27}}, "confidence": 0.98, "reasoning": "Clear arithmetic addition request"}} User: "What is sine 30 degrees?" Response: {{"tool": "trig", "params": {{"operation": "sine", "num1": 30, "unit": "degree"}}, "confidence": 0.98, "reasoning": "Clear trigonometic sine function request"}} User: "Get Apple stock price" Response: {{"tool": "stock_quote", "params": {{"ticker": "AAPL"}}, "confidence": 0.95, "reasoning": "Request for Apple (AAPL) stock information"}} User: "echo hello world" Response: {{"tool": "echo", "params": {{"message": "hello world"}}, "confidence": 0.99, "reasoning": "Direct echo command"}} User: "server status" Response: {{"tool": "health", "params": {{}}, "confidence": 0.90, "reasoning": "Request for server health information"}} Remember: Respond with ONLY the JSON object, no additional text.""" async def parse_query_with_llm(self, query: str, available_tools: List[Dict]) -> Optional[Dict[str, Any]]: """Use LLM to parse the query""" if not self.client: logging.error("❌ LLM client not available, falling back to rule-based parsing") return None system_prompt = self.get_system_prompt(available_tools) try: if self.provider == "openai": response = self.client.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": query} ], temperature=0.1, max_tokens=200 ) llm_response = response.choices[0].message.content.strip() elif self.provider == "anthropic": response = self.client.messages.create( model="claude-3-haiku-20240307", max_tokens=200, temperature=0.1, system=system_prompt, messages=[{"role": "user", "content": query}] ) llm_response = response.content[0].text.strip() elif self.provider == "ollama": import requests payload = { "model": "llama3.2", # or "mistral", "codellama", etc. "prompt": f"{system_prompt}\n\nUser: {query}\nResponse:", "stream": False, "options": { "temperature": 0.1, "num_predict": 200 } } response = requests.post("http://localhost:11434/api/generate", json=payload, timeout=30) if response.status_code == 200: llm_response = response.json()["response"].strip() else: logging.error(f"Ollama API error: {response.status_code}") return None elif self.provider == "gemini": response = self.client.generate_content( f"{system_prompt}\n\nUser: {query}", generation_config={ "temperature": 0.1, "max_output_tokens": 200 } ) llm_response = response.text.strip() elif self.provider == "bedrock": import json as json_lib # Construct the request for Claude 3.5 Sonnet request_body = { "anthropic_version": "bedrock-2023-05-31", "max_tokens": 200, "temperature": 0.1, "system": system_prompt, "messages": [ { "role": "user", "content": query } ] } # Make the request to Bedrock response = self.client.invoke_model( modelId=self.bedrock_model_id, body=json_lib.dumps(request_body), contentType="application/json", accept="application/json" ) # Parse the response response_body = json_lib.loads(response['body'].read()) llm_response = response_body['content'][0]['text'].strip() logging.debug(f"Bedrock response usage: {response_body.get('usage', {})}") # Parse the JSON response logging.debug(f"LLM Response: {llm_response}") # Clean up the response - sometimes LLMs add extra text if llm_response.startswith("```json"): llm_response = llm_response.replace("```json", "").replace("```", "").strip() parsed_response = json.loads(llm_response) # Validate the response structure if not isinstance(parsed_response, dict): logging.error("LLM response is not a dictionary") return None if parsed_response.get("tool") is None: logging.warning(f"LLM could not parse query: {parsed_response.get('reasoning', 'Unknown reason')}") return None confidence = parsed_response.get("confidence", 0.0) if confidence < 0.5: logging.warning(f"Low confidence ({confidence}) in LLM parsing: {parsed_response.get('reasoning')}") return None logging.info(f"🤖 LLM parsed query: {parsed_response.get('reasoning')} (confidence: {confidence})") return { "tool": parsed_response["tool"], "params": parsed_response.get("params", {}), "confidence": confidence, "reasoning": parsed_response.get("reasoning", "") } except json.JSONDecodeError as e: logging.error(f"Failed to parse LLM JSON response: {e}") logging.error(f"Raw LLM response: {llm_response}") return None except Exception as e: logging.error(f"LLM query parsing error: {e}") return None # --- Fallback Rule-Based Parser (same as before) --- class RuleBasedQueryParser: """Fallback rule-based query parser""" @staticmethod def parse_calculator_query(query: str) -> Optional[Dict[str, Any]]: import re query = query.lower().strip() patterns = [ ("add", ["plus", "add", "+", "sum"]), ("subtract", ["minus", "subtract", "-", "difference"]), ("multiply", ["times", "multiply", "*", "×", "product"]), ("divide", ["divide", "divided by", "/", "÷"]), ] for operation, keywords in patterns: for keyword in keywords: if keyword in query: numbers = re.findall(r'-?\d+(?:\.\d+)?', query) if len(numbers) >= 2: try: return { "tool": "calculator", "params": { "operation": operation, "num1": float(numbers[0]), "num2": float(numbers[1]) } } except ValueError: pass return None @staticmethod def parse_stock_query(query: str) -> Optional[Dict[str, Any]]: import re query_lower = query.lower().strip() stock_keywords = ["stock", "price", "quote", "ticker", "share"] if any(keyword in query_lower for keyword in stock_keywords): tickers = re.findall(r'\b[A-Z]{2,5}\b', query.upper()) if tickers: excluded_words = {"GET", "THE", "FOR", "AND", "BUT", "NOT", "YOU", "ALL", "CAN"} valid_tickers = [t for t in tickers if t not in excluded_words] if valid_tickers: return { "tool": "stock_quote", "params": {"ticker": valid_tickers[0]} } common_tickers = ["AAPL", "GOOGL", "GOOG", "MSFT", "TSLA", "AMZN", "META", "NVDA"] for ticker in common_tickers: if ticker in query.upper(): return { "tool": "stock_quote", "params": {"ticker": ticker} } return None @staticmethod def parse_query(query: str) -> Optional[Dict[str, Any]]: if any(word in query.lower() for word in ["health", "status", "ping"]): return {"tool": "health", "params": {}} if query.lower().startswith("echo "): message = query[5:].strip() return {"tool": "echo", "params": {"message": message}} calc_result = RuleBasedQueryParser.parse_calculator_query(query) if calc_result: return calc_result stock_result = RuleBasedQueryParser.parse_stock_query(query) if stock_result: return stock_result return None # --- Same extraction and formatting functions as before --- def extract_result_data(result): """Extract actual data from FastMCP result object""" try: if isinstance(result, list) and len(result) > 0: content_item = result[0] if hasattr(content_item, 'text'): try: return json.loads(content_item.text) except json.JSONDecodeError: return {"text": content_item.text} else: return {"content": str(content_item)} elif hasattr(result, 'content') and result.content: content_item = result.content[0] if hasattr(content_item, 'text'): try: return json.loads(content_item.text) except json.JSONDecodeError: return {"text": content_item.text} else: return {"content": str(content_item)} else: if isinstance(result, dict): return result else: return {"result": str(result)} except Exception as e: logging.error(f"Error extracting result data: {e}") return {"error": f"Could not parse result: {e}"} def format_result(tool_name: str, result: Dict) -> str: """Format tool results for display""" if isinstance(result, dict) and "error" in result: return f"❌ Error: {result['error']}" if tool_name == "calculator": if "result" in result: expression = result.get('expression') if expression: return f"🧮 {expression}" else: num1 = result.get('num1', '?') num2 = result.get('num2', '?') operation = result.get('operation', '?') calc_result = result.get('result', '?') return f"🧮 {num1} {operation} {num2} = {calc_result}" elif "error" in result: return f"❌ Calculator Error: {result['error']}" elif tool_name == "stock_quote": if "current_price" in result: ticker = result.get('ticker', 'Unknown') name = result.get('company_name', ticker) price = result.get('current_price', 0) currency = result.get('currency', 'USD') extra_info = [] if result.get('volume'): extra_info.append(f"Vol: {result['volume']:,}") if result.get('day_high') and result.get('day_low'): extra_info.append(f"Range: {result['day_low']}-{result['day_high']}") extra = f" ({', '.join(extra_info)})" if extra_info else "" return f"📈 {name} ({ticker}): {currency} {price}{extra}" elif "error" in result: return f"❌ Stock Error: {result['error']}" elif tool_name == "health": if isinstance(result, dict): return f"✅ {result.get('message', 'Server is healthy')}" elif tool_name == "echo": if isinstance(result, dict): return f"🔊 {result.get('echo', result.get('message', str(result)))}" try: return f"✅ Result: {json.dumps(result, indent=2)}" except (TypeError, ValueError): return f"✅ Result: {str(result)}" # --- Main Demo with LLM Integration --- async def run_llm_demo(): print("🚀 LLM-powered MCP client demo starting...") print(f"🤖 Using LLM Provider: {LLM_PROVIDER}") # Initialize parsers llm_parser = LLMQueryParser(LLM_PROVIDER) fallback_parser = RuleBasedQueryParser() server_path = "mcp_server.py" try: print(f"📡 Connecting to MCP server: {server_path}") async with Client(server_path) as client: print("✅ Connected to MCP server (local)!") # Discover available tools for LLM context available_tools = [] try: tools = await client.list_tools() # print(f"tools: {tools}") if tools: available_tools = [ {"name": tool.name, "description": tool.description} for tool in tools ] print(f"✅ Found {len(available_tools)} tools for LLM context") else: # Fallback tool definitions available_tools = [ {"name": "calculator", "description": "Perform arithmetic operations"}, {"name": "trig", "description": "Performs trigonometric operations"}, {"name": "stock_quote", "description": "Get stock price data"}, {"name": "health", "description": "Check server health"}, {"name": "echo", "description": "Echo back messages"} ] except Exception as e: logging.error(f"Tool discovery failed: {e}") print(f"\n{'='*60}") print("🎯 LLM-Powered Interactive Demo Started!") print("\n📝 Try natural language queries:") print(" • 'What's fifteen plus twenty seven?'") print(" • 'Can you multiply 12 by 8?'") print(" • 'Find sine of 30 degrees'") print(" • 'I need the current Apple stock price'") print(" • 'How much is Tesla trading for?'") print(" • 'Please echo back: Hello AI!'") print(" • 'Is the server working properly?'") print("\n💡 Commands:") print(" • 'tools' - List available tools") print(" • 'switch' - Switch parsing mode") print(" • 'exit' - Quit the demo") print(f"{'='*60}") use_llm = llm_parser.client is not None while True: try: user_input = input(f"\n💬 Your query {'🤖' if use_llm else '🔧'}: ").strip() if not user_input: continue if user_input.lower() in ('exit', "quit", "bye", "q"): print("👋 Goodbye!") break if user_input.lower() == 'switch': use_llm = not use_llm mode = "LLM" if use_llm else "Rule-based" print(f"🔄 Switched to {mode} parsing mode") continue if user_input.lower() == 'tools': print("\n🔧 Available tools:") for tool in available_tools: print(f" • {tool['name']}: {tool['description']}") continue # Parse the query parsed_query = None if use_llm and llm_parser.client: parsed_query = await llm_parser.parse_query_with_llm(user_input, available_tools) # Fallback to rule-based if LLM fails if not parsed_query: if use_llm: print("🔄 LLM parsing failed, trying rule-based parser...") parsed_query = fallback_parser.parse_query(user_input) if not parsed_query: print("❓ I couldn't understand your query. Try rephrasing or being more specific.") continue # Execute the tool call tool_name = parsed_query["tool"] parameters = parsed_query["params"] print(f"🔧 Calling tool: {tool_name}") if parsed_query.get("reasoning"): print(f"💭 Reasoning: {parsed_query['reasoning']}") if parameters: print(f"📝 Parameters: {json.dumps(parameters, indent=2)}") try: result = await client.call_tool(tool_name, parameters) result_data = extract_result_data(result) print(format_result(tool_name, result_data)) except Exception as e: print(f"❌ Error calling tool: {e}") logging.error(f"Tool call error: {e}", exc_info=True) except KeyboardInterrupt: print("\n\n👋 Goodbye!") break except Exception as e: logging.error(f"Unexpected error: {e}", exc_info=True) print(f"❌ Unexpected error: {e}") except Exception as e: print(f"❌ Failed to connect to server: {e}") print("\nMake sure the server file exists and FastMCP is installed:") print(" pip install fastmcp yfinance") print(f" Ensure {server_path} exists in the current directory") def main(): """Run the LLM-powered async demo""" print("🤖 LLM-Powered MCP Client") print("=" * 40) print(f"Selected LLM Provider: {LLM_PROVIDER}") if LLM_PROVIDER == "ollama": print("📋 Ollama Setup Instructions:") print("1. Install Ollama: https://ollama.ai") print("2. Run: ollama pull llama3.2") print("3. Start Ollama service") elif LLM_PROVIDER == "openai": print("📋 Set environment variable: OPENAI_API_KEY") elif LLM_PROVIDER == "anthropic": print("📋 Set environment variable: ANTHROPIC_API_KEY") elif LLM_PROVIDER == "gemini": print("📋 Set environment variable: GEMINI_API_KEY") elif LLM_PROVIDER == "bedrock": print("📋 AWS Bedrock Setup Instructions:") print("1. Install boto3: pip install boto3") print("2. Configure AWS credentials (choose one):") print(" • AWS CLI: aws configure") print(" • Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION") print(" • IAM roles (if running on AWS)") print(" • AWS SSO: aws sso login") print("3. Ensure you have access to Claude 3.5 Sonnet in Bedrock") print("4. Set AWS_DEFAULT_REGION if needed (default: us-east-1)") print("=" * 40) asyncio.run(run_llm_demo()) if __name__ == '__main__': main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/digital-duck/mcp_demo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server