demo.py•4.96 kB
"""
Demo script to test the weather API and MCP server functionality
Run this after starting the FastAPI server
"""
import asyncio
import httpx
import json
async def test_weather_api():
"""Test the FastAPI weather endpoints"""
base_url = "http://localhost:8000"
print("=" * 60)
print("Testing Weather API")
print("=" * 60)
async with httpx.AsyncClient() as client:
# Test health endpoint
print("\n1. Testing Health Check:")
try:
response = await client.get(f"{base_url}/health")
print(f" Status: {response.status_code}")
print(f" Response: {response.json()}")
except Exception as e:
print(f" Error: {e}")
# Test weather endpoint (GET)
print("\n2. Testing Weather Endpoint (GET) - London:")
try:
response = await client.get(f"{base_url}/weather", params={"city": "London"})
print(f" Status: {response.status_code}")
weather = response.json()
print(f" City: {weather['city']}")
print(f" Temperature: {weather['temperature']}°C")
print(f" Condition: {weather['condition']}")
print(f" Full Response: {json.dumps(weather, indent=2)}")
except Exception as e:
print(f" Error: {e}")
# Test weather endpoint (POST)
print("\n3. Testing Weather Endpoint (POST) - Tokyo:")
try:
response = await client.post(
f"{base_url}/weather",
json={"city": "Tokyo", "country": "Japan"}
)
print(f" Status: {response.status_code}")
weather = response.json()
print(f" City: {weather['city']}, {weather['country']}")
print(f" Temperature: {weather['temperature']}°C")
print(f" Condition: {weather['condition']}")
except Exception as e:
print(f" Error: {e}")
# Test multiple cities
print("\n4. Testing Multiple Cities:")
cities = ["Paris", "New York", "Sydney"]
for city in cities:
try:
response = await client.get(f"{base_url}/weather", params={"city": city})
weather = response.json()
print(f" {weather['city']}: {weather['temperature']}°C, {weather['condition']}")
except Exception as e:
print(f" {city}: Error - {e}")
async def test_mcp_tools():
"""Simulate MCP tool calls"""
print("\n" + "=" * 60)
print("Simulating MCP Tool Calls")
print("=" * 60)
base_url = "http://localhost:8000"
# Simulate get_weather tool
print("\n1. Simulating 'get_weather' tool:")
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{base_url}/weather", params={"city": "Berlin"})
weather = response.json()
print(f" Tool: get_weather(city='Berlin')")
print(f" Result: {json.dumps(weather, indent=6)}")
except Exception as e:
print(f" Error: {e}")
# Simulate get_weather_batch tool
print("\n2. Simulating 'get_weather_batch' tool:")
try:
cities = ["Mumbai", "Dubai", "Singapore"]
async with httpx.AsyncClient() as client:
tasks = [client.get(f"{base_url}/weather", params={"city": city}) for city in cities]
responses = await asyncio.gather(*tasks)
results = [r.json() for r in responses]
print(f" Tool: get_weather_batch(cities='{','.join(cities)}')")
print(f" Result: {json.dumps(results, indent=6)}")
except Exception as e:
print(f" Error: {e}")
# Simulate check_api_health tool
print("\n3. Simulating 'check_api_health' tool:")
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{base_url}/health")
health = response.json()
print(f" Tool: check_api_health()")
print(f" Result: {json.dumps(health, indent=6)}")
except Exception as e:
print(f" Error: {e}")
async def main():
"""Run all tests"""
print("\n")
print("=" * 60)
print("Weather API & MCP Server Demo")
print("=" * 60)
print("\nMake sure the FastAPI server is running on http://localhost:8000")
print("Start it with: python weather_api.py\n")
input("Press Enter to start testing...")
await test_weather_api()
await test_mcp_tools()
print("\n" + "=" * 60)
print("Demo Complete!")
print("=" * 60)
print("\nTo use with Gemini CLI:")
print("1. Start FastAPI server: python weather_api.py")
print("2. Configure Gemini CLI with mcp_config.json")
print("3. Run: gemini mcp list")
print("4. Run: gemini mcp call weather-info get_weather --city 'Tokyo'")
if __name__ == "__main__":
asyncio.run(main())