test_mcp_functionality.py•7.66 kB
#!/usr/bin/env python3
"""
Comprehensive test script for MCP functionality in server.py
Tests the MCP server connection and all available tools
"""
import asyncio
import os
import sys
from dotenv import load_dotenv
from fastmcp import Client
# Load environment variables
load_dotenv()
async def test_mcp_functionality():
"""Test all MCP functionality in server.py"""
print("Testing MCP Functionality in server.py")
print("=" * 50)
# Check if required environment variables are set
missing_keys = []
if not os.getenv("TAVILY_API_KEY"):
missing_keys.append("TAVILY_API_KEY")
if not os.getenv("POLYGON_API_KEY"):
missing_keys.append("POLYGON_API_KEY")
if missing_keys:
print(f"WARNING: Missing API keys: {', '.join(missing_keys)}")
print(" Set these in your .env file to test all functionality.")
print()
try:
# Test 1: Connect to MCP server and list tools
print("Test 1: Connecting to MCP server and listing tools")
print("-" * 40)
async with Client("server.py") as client:
tools = await client.list_tools()
print(f"SUCCESS: Successfully connected to MCP server")
print(f"Available tools: {len(tools)}")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
print()
# Test 2: Test roll_dice tool
print("Test 2: Testing roll_dice tool")
print("-" * 40)
# Test various dice notations
dice_tests = [
("1d20", "Single d20 roll"),
("2d6", "Two d6 rolls"),
("3d20k1", "Three d20 rolls, keep highest"),
("4d6k3", "Four d6 rolls, keep highest 3"),
("1d100", "Single d100 roll")
]
for notation, description in dice_tests:
try:
print(f" Testing: {notation} ({description})")
result = await client.call_tool("roll_dice", {"notation": notation})
print(f" SUCCESS: Result: {result}")
except Exception as e:
print(f" ERROR: {e}")
print()
# Test 3: Test web_search tool (if API key is available)
if os.getenv("TAVILY_API_KEY"):
print("Test 3: Testing web_search tool")
print("-" * 40)
search_tests = [
"What is Python programming?",
"Latest news about artificial intelligence",
"How to roll dice in D&D?"
]
for query in search_tests:
try:
print(f" Searching: {query}")
result = await client.call_tool("web_search", {"query": query})
# Extract text content from CallToolResult
if hasattr(result, 'content') and result.content:
text_content = result.content[0].text if result.content else str(result)
else:
text_content = str(result)
print(f" SUCCESS: Search completed. Result length: {len(text_content)} characters")
print(f" Preview: {text_content[:200]}...")
except Exception as e:
print(f" ERROR: {e}")
print()
else:
print("Test 3: Skipping web_search tool (no API key)")
print("-" * 40)
print(" WARNING: TAVILY_API_KEY not set, skipping web search tests")
print()
# Test 4: Test get_stock_ohlc tool (if API key is available)
if os.getenv("POLYGON_API_KEY"):
print("Test 4: Testing get_stock_ohlc tool")
print("-" * 40)
stock_tests = [
("AAPL", "Apple Inc."),
("MSFT", "Microsoft Corporation"),
("GOOGL", "Alphabet Inc."),
("TSLA", "Tesla Inc.")
]
for symbol, company in stock_tests:
try:
print(f" Fetching stock data for: {symbol} ({company})")
result = await client.call_tool("get_stock_ohlc", {"symbol": symbol})
# Extract text content from CallToolResult
if hasattr(result, 'content') and result.content:
text_content = result.content[0].text if result.content else str(result)
else:
text_content = str(result)
print(f" SUCCESS: Stock data retrieved")
print(f" Data: {text_content}")
except Exception as e:
print(f" ERROR: {e}")
print()
else:
print("Test 4: Skipping get_stock_ohlc tool (no API key)")
print("-" * 40)
print(" WARNING: POLYGON_API_KEY not set, skipping stock data tests")
print()
# Test 5: Test error handling
print("Test 5: Testing error handling")
print("-" * 40)
error_tests = [
("roll_dice", {"notation": "invalid_notation"}, "Invalid dice notation"),
("roll_dice", {"notation": "1d20", "num_rolls": -1}, "Negative number of rolls"),
("get_stock_ohlc", {"symbol": "INVALID_SYMBOL"}, "Invalid stock symbol"),
("nonexistent_tool", {"param": "value"}, "Non-existent tool"),
]
for tool_name, params, description in error_tests:
try:
print(f" Testing: {description}")
result = await client.call_tool(tool_name, params)
print(f" WARNING: Unexpected success: {result}")
except Exception as e:
print(f" SUCCESS: Expected error caught: {type(e).__name__}: {e}")
print()
print("All MCP functionality tests completed!")
except Exception as e:
print(f"ERROR: Failed to connect to MCP server: {e}")
print(" Make sure server.py is working and dependencies are installed")
return False
return True
async def test_specific_dice_roll():
"""Test the specific dice roll requested by user: 1d20 keep highest"""
print("\nTesting specific request: Roll 1d20 and keep highest")
print("=" * 50)
try:
async with Client("server.py") as client:
# Test the specific notation: 1d20k1 (1 die, 20 sides, keep 1 highest)
result = await client.call_tool("roll_dice", {"notation": "1d20k1"})
print(f"Result: {result}")
print("SUCCESS: Successfully rolled 1d20 and kept the highest!")
except Exception as e:
print(f"ERROR: Error testing specific dice roll: {e}")
if __name__ == "__main__":
print("Starting MCP functionality tests...")
print()
# Run the comprehensive tests
success = asyncio.run(test_mcp_functionality())
if success:
# Run the specific test requested by user
asyncio.run(test_specific_dice_roll())
print("\nTesting completed!")