test_http_local.py•3.63 kB
#!/usr/bin/env python3
"""Test HTTP MCP server locally"""
import httpx
import asyncio
import json
import sys
async def test_mcp_server():
    base_url = "http://localhost:8081/mcp"
    
    async with httpx.AsyncClient(timeout=30.0) as client:
        print("Testing MCP HTTP Server...")
        print("=" * 50)
        
        # Test 1: Initialize
        print("\n1. Testing initialization...")
        try:
            response = await client.post(
                base_url,
                headers={
                    "Content-Type": "application/json",
                    "Accept": "application/json, text/event-stream"
                },
                json={
                    "jsonrpc": "2.0",
                    "method": "initialize",
                    "params": {
                        "clientInfo": {
                            "name": "test-client",
                            "version": "1.0.0"
                        }
                    },
                    "id": 1
                }
            )
            print(f"Status: {response.status_code}")
            if response.status_code == 200:
                print("✅ Initialization successful")
                result = response.json()
                print(f"Server: {result.get('result', {}).get('serverInfo', {})}")
            else:
                print(f"❌ Failed: {response.text}")
        except Exception as e:
            print(f"❌ Error: {e}")
        
        # Test 2: List tools
        print("\n2. Testing tools/list...")
        try:
            response = await client.post(
                base_url,
                headers={"Content-Type": "application/json"},
                json={
                    "jsonrpc": "2.0",
                    "method": "tools/list",
                    "id": 2
                }
            )
            if response.status_code == 200:
                tools = response.json().get("result", {}).get("tools", [])
                print(f"✅ Found {len(tools)} tools:")
                for tool in tools:
                    print(f"   - {tool['name']}")
            else:
                print(f"❌ Failed: {response.text}")
        except Exception as e:
            print(f"❌ Error: {e}")
        
        # Test 3: Call opportunity_discovery tool
        print("\n3. Testing opportunity_discovery tool...")
        try:
            response = await client.post(
                base_url,
                headers={"Content-Type": "application/json"},
                json={
                    "jsonrpc": "2.0",
                    "method": "tools/call",
                    "params": {
                        "name": "opportunity_discovery",
                        "arguments": {
                            "query": "renewable energy",
                            "max_results": 5
                        }
                    },
                    "id": 3
                },
                timeout=60.0  # Longer timeout for API calls
            )
            if response.status_code == 200:
                print("✅ Tool call successful")
                result = response.json().get("result", {})
                if "content" in result and result["content"]:
                    content = result['content'][0].get('text', '')
                    print(f"Response preview: {content[:200]}...")
            else:
                print(f"❌ Failed: {response.text}")
        except Exception as e:
            print(f"❌ Error: {e}")
        
        print("\n" + "=" * 50)
        print("Testing complete!")
if __name__ == "__main__":
    asyncio.run(test_mcp_server())