Skip to main content
Glama
digital-duck

Model Context Protocol Demo

by digital-duck
mcp_client_simple.py16.1 kB
import asyncio import json import logging import re from pathlib import Path from typing import Dict, List, Any, Optional from fastmcp import Client # Configure logging with DEBUG level to see what's happening # other levels include (INFO, WARNING, ERROR, CRITICAL) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') def get_mcp_server_path(server_filename="mcp_server.py"): """ Get absolute path to MCP server file in the same directory as the current script. Args: server_filename (str): Name of the MCP server file (default: "mcp_server.py") Returns: Path: Absolute path to the MCP server file Raises: FileNotFoundError: If the server file doesn't exist RuntimeError: If unable to determine current script directory """ try: # Method 1: Use __file__ if available (works in most cases) if '__file__' in globals(): current_script_dir = Path(__file__).parent.resolve() else: # Method 2: Fallback for interactive environments current_script_dir = Path.cwd() # Construct the server path server_path = current_script_dir / server_filename # Verify the file exists if not server_path.exists(): raise FileNotFoundError(f"MCP server file not found: {server_path}") return server_path.resolve() # Return absolute path except Exception as e: raise RuntimeError(f"Failed to determine MCP server path: {e}") # --- Simple Query Parser --- class QueryParser: """Parse natural language queries into tool calls""" @staticmethod def parse_calculator_query(query: str) -> Optional[Dict[str, Any]]: """Parse math queries""" query = query.lower().strip() # Operation patterns patterns = [ ("add", ["plus", "add", "+", "sum"]), ("subtract", ["minus", "subtract", "-", "difference"]), ("multiply", ["times", "multiply", "*", "×", "product"]), ("divide", ["divide", "divided by", "/", "÷"]), ] for operation, keywords in patterns: for keyword in keywords: if keyword in query: # Extract numbers numbers = re.findall(r'-?\d+(?:\.\d+)?', query) if len(numbers) >= 2: try: return { "tool": "calculator", "params": { "operation": operation, "num1": float(numbers[0]), "num2": float(numbers[1]) } } except ValueError: pass return None @staticmethod def parse_stock_query(query: str) -> Optional[Dict[str, Any]]: """Parse stock price queries""" query_lower = query.lower().strip() stock_keywords = ["stock", "price", "quote", "ticker", "share"] if any(keyword in query_lower for keyword in stock_keywords): # Extract ticker symbols (2-5 uppercase letters) tickers = re.findall(r'\b[A-Z]{2,5}\b', query.upper()) if tickers: # Filter out common words that aren't tickers excluded_words = {"GET", "THE", "FOR", "AND", "BUT", "NOT", "YOU", "ALL", "CAN", "HER", "WAS", "ONE", "OUR", "OUT", "DAY", "HAD", "HAS", "HIS", "HOW", "ITS", "MAY", "NEW", "NOW", "OLD", "SEE", "TWO", "WHO", "BOY", "DID", "CAR", "EAT", "FAR", "FUN", "GOT", "HIM", "LET", "MAN", "PUT", "SAY", "SHE", "TOO", "USE"} valid_tickers = [t for t in tickers if t not in excluded_words] if valid_tickers: return { "tool": "stock_quote", "params": { "ticker": valid_tickers[0] } } # Also check for common ticker patterns without explicit stock keywords common_tickers = ["AAPL", "GOOGL", "GOOG", "MSFT", "TSLA", "AMZN", "META", "NVDA", "AMD", "INTC"] for ticker in common_tickers: if ticker in query.upper(): return { "tool": "stock_quote", "params": { "ticker": ticker } } return None @staticmethod def parse_query(query: str) -> Optional[Dict[str, Any]]: """Main query parser""" # Health check if any(word in query.lower() for word in ["health", "status", "ping"]): return {"tool": "health", "params": {}} # Echo test if query.lower().startswith("echo "): message = query[5:].strip() return { "tool": "echo", "params": {"message": message} } # Try calculator calc_result = QueryParser.parse_calculator_query(query) if calc_result: return calc_result # Try stock query stock_result = QueryParser.parse_stock_query(query) if stock_result: return stock_result return None def extract_result_data(result): """Extract actual data from FastMCP result object""" try: # Debug: Print what we're actually getting logging.debug(f" Raw result type: {type(result)}") logging.debug(f" Raw result: {result}") # FastMCP returns a list of TextContent objects directly if isinstance(result, list) and len(result) > 0: logging.debug(f" Result is a list with {len(result)} items") content_item = result[0] logging.debug(f" Content item type: {type(content_item)}") logging.debug(f" Content item: {content_item}") if hasattr(content_item, 'text'): logging.debug(f" Content text: {content_item.text}") # Try to parse as JSON try: parsed_data = json.loads(content_item.text) logging.debug(f" Parsed JSON data: {parsed_data}") return parsed_data except json.JSONDecodeError as e: logging.debug(f" JSON decode failed: {e}") return {"text": content_item.text} else: logging.debug(f" Content item has no text attribute") return {"content": str(content_item)} # Fallback: Check if it has content attribute (old path) elif hasattr(result, 'content') and result.content: logging.debug(f" Found content attribute with {len(result.content)} items") content_item = result.content[0] if hasattr(content_item, 'text'): try: parsed_data = json.loads(content_item.text) return parsed_data except json.JSONDecodeError: return {"text": content_item.text} else: return {"content": str(content_item)} else: logging.debug(f" No recognizable structure found") # Maybe it's already the data we need? if isinstance(result, dict): logging.debug(f" Result is already a dict") return result else: logging.debug(f" Converting result to string: {str(result)}") return {"result": str(result)} except Exception as e: logging.debug(f" Exception in extract_result_data: {e}") logging.error(f"Error extracting result data: {e}", exc_info=True) return {"error": f"Could not parse result: {e}"} def format_result(tool_name: str, result: Dict) -> str: """Format tool results for display""" if isinstance(result, dict) and "error" in result: return f"❌ Error: {result['error']}" if tool_name == "calculator": if "result" in result: return f"""🧮 {result.get('expression', f'{result["num1"]} {result["operation"]} {result["num2"]} = {result["result"]}')}""" elif "error" in result: return f"❌ Calculator Error: {result['error']}" elif tool_name == "stock_quote": if "current_price" in result: name = result.get('company_name', result['ticker']) price = result['current_price'] currency = result.get('currency', 'USD') extra_info = [] if result.get('volume'): extra_info.append(f"Vol: {result['volume']:,}") if result.get('day_high') and result.get('day_low'): extra_info.append(f"Range: {result['day_low']}-{result['day_high']}") extra = f" ({', '.join(extra_info)})" if extra_info else "" return f"📈 {name} ({result['ticker']}): {currency} {price}{extra}" elif "error" in result: return f"❌ Stock Error: {result['error']}" elif tool_name == "health": return f"✅ {result.get('message', 'Server is healthy')}" elif tool_name == "echo": return f"🔊 {result.get('echo', result.get('message', str(result)))}" return f"✅ Result: {json.dumps(result, indent=2)}" # --- Main Demo --- async def run_demo(): logging.info("🚀 MCP Client Demo Starting...") try: # Connect to the FastMCP server directly server_path = str(get_mcp_server_path(server_filename="mcp_server.py")) logging.info(f"📡 Connecting to MCP server: {server_path}") async with Client(server_path) as client: logging.info("✅ Connected to MCP server!") # Test with health check logging.info("\n🩺 Testing with health check...") try: health_result = await client.call_tool("health", {}) result_data = extract_result_data(health_result) logging.info(format_result("health", result_data)) except Exception as e: logging.error(f"⚠️ Health check failed: {e}") logging.error(f"Health check error: {e}", exc_info=True) # Discover tools logging.info("\n🔍 Discovering available tools...") try: tools = await client.list_tools() if tools.tools: logging.info(f"✅ Found {len(tools.tools)} tools:") for tool in tools.tools: logging.info(f" • {tool.name}: {tool.description}") else: logging.error("⚠️ No tools discovered") except Exception as e: logging.error(f"⚠️ Tool discovery failed: {e}") tools = None # Discover resources logging.info("\n📚 Discovering available resources...") try: resources = await client.list_resources() if resources.resources: logging.info(f"✅ Found {len(resources.resources)} resources:") for resource in resources.resources: logging.info(f" • {resource.uri}: {resource.description}") else: logging.error("⚠️ No resources discovered") except Exception as e: logging.error(f"⚠️ Resource discovery failed: {e}") print(f"\n{'='*60}") print("🎯 Interactive Demo Started!") print("\n📝 Try these example queries:") print(" • 'What is 15 plus 27?'") print(" • 'Calculate 100 divided by 4'") print(" • 'Get AAPL stock price'") print(" • 'TSLA quote'") print(" • 'echo Hello World'") print(" • 'health check'") print("\n💡 Commands:") print(" • 'tools' - List available tools") print(" • 'resources' - List available resources") print(" • 'exit' - Quit the demo") print(f"{'='*60}") parser = QueryParser() while True: try: user_input = input("\n💬 Your query: ").strip() if not user_input: continue if user_input.lower() == 'exit': logging.info("👋 Goodbye!") break if user_input.lower() == 'tools': if tools and tools.tools: logging.info("\n🔧 Available tools:") for tool in tools.tools: logging.info(f" • {tool.name}: {tool.description}") else: logging.info("\n🔧 Known tools: calculator, stock_quote, health, echo") continue if user_input.lower() == 'resources': try: resources = await client.list_resources() if resources.resources: logging.info("\n📚 Available resources:") for resource in resources.resources: logging.info(f" • {resource.uri}: {resource.description}") else: logging.info("\n📚 No resources available") except Exception as e: logging.error(f"\n❌ Could not list resources: {e}") continue # Parse the query parsed_query = parser.parse_query(user_input) if not parsed_query: logging.warning("❓ I couldn't understand your query. Try rephrasing or check the examples above.") continue # Execute the tool call tool_name = parsed_query["tool"] parameters = parsed_query["params"] logging.info(f"🔧 Calling tool: {tool_name}") if parameters: logging.info(f"📝 Parameters: {json.dumps(parameters, indent=2)}") try: result = await client.call_tool(tool_name, parameters) result_data = extract_result_data(result) logging.info(format_result(tool_name, result_data)) except Exception as e: logging.error(f"❌ Error calling tool: {e}") logging.error(f"Tool call error: {e}", exc_info=True) except KeyboardInterrupt: logging.info("\n\n👋 Goodbye!") break except Exception as e: logging.error(f"Unexpected error: {e}", exc_info=True) logging.error(f"❌ Unexpected error: {e}") except Exception as e: logging.error(f"❌ Failed to connect to server: {e}") logging.error("\nMake sure the server file exists and FastMCP is installed:") logging.error(" pip install fastmcp yfinance") logging.error(f" Ensure {server_path} exists in the current directory") def main(): """Run the async demo""" asyncio.run(run_demo()) if __name__ == '__main__': main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/digital-duck/mcp_demo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server