Skip to main content
Glama
test_http_integration.py8.98 kB
#!/usr/bin/env python3 """ Integration test for MCP server using HTTP streaming transport. This avoids the stdio subprocess timing issues. """ import asyncio import json import sys import time from mcp.client.session import ClientSession async def test_http_transport(base_url: str = 'http://localhost:8000'): """Test MCP server with HTTP streaming transport.""" # Import the streamable http client try: from mcp.client.streamable_http import streamablehttp_client as http_client except ImportError: print('❌ Streamable HTTP client not available in MCP SDK') return False test_group_id = f'test_http_{int(time.time())}' print('🚀 Testing MCP Server with HTTP streaming transport') print(f' Server URL: {base_url}') print(f' Test Group: {test_group_id}') print('=' * 60) try: # Connect to the server via HTTP print('\n🔌 Connecting to server...') async with http_client(base_url) as (read_stream, write_stream): session = ClientSession(read_stream, write_stream) await session.initialize() print('✅ Connected successfully') # Test 1: List tools print('\n📋 Test 1: Listing tools...') try: result = await session.list_tools() tools = [tool.name for tool in result.tools] expected = [ 'add_memory', 'search_memory_nodes', 'search_memory_facts', 'get_episodes', 'delete_episode', 'clear_graph', ] found = [t for t in expected if t in tools] print(f' ✅ Found {len(tools)} tools ({len(found)}/{len(expected)} expected)') for tool in tools[:5]: print(f' - {tool}') except Exception as e: print(f' ❌ Failed: {e}') return False # Test 2: Add memory print('\n📝 Test 2: Adding memory...') try: result = await session.call_tool( 'add_memory', { 'name': 'Integration Test Episode', 'episode_body': 'This is a test episode created via HTTP transport integration test.', 'group_id': test_group_id, 'source': 'text', 'source_description': 'HTTP Integration Test', }, ) if result.content and result.content[0].text: response = result.content[0].text if 'success' in response.lower() or 'queued' in response.lower(): print(' ✅ Memory added successfully') else: print(f' ❌ Unexpected response: {response[:100]}') else: print(' ❌ No content in response') except Exception as e: print(f' ❌ Failed: {e}') # Test 3: Search nodes (with delay for processing) print('\n🔍 Test 3: Searching nodes...') await asyncio.sleep(2) # Wait for async processing try: result = await session.call_tool( 'search_memory_nodes', {'query': 'integration test episode', 'group_ids': [test_group_id], 'limit': 5}, ) if result.content and result.content[0].text: response = result.content[0].text try: data = json.loads(response) nodes = data.get('nodes', []) print(f' ✅ Search returned {len(nodes)} nodes') except Exception: # noqa: E722 print(f' ✅ Search completed: {response[:100]}') else: print(' ⚠️ No results (may be processing)') except Exception as e: print(f' ❌ Failed: {e}') # Test 4: Get episodes print('\n📚 Test 4: Getting episodes...') try: result = await session.call_tool( 'get_episodes', {'group_ids': [test_group_id], 'limit': 10} ) if result.content and result.content[0].text: response = result.content[0].text try: data = json.loads(response) episodes = data.get('episodes', []) print(f' ✅ Found {len(episodes)} episodes') except Exception: # noqa: E722 print(f' ✅ Episodes retrieved: {response[:100]}') else: print(' ⚠️ No episodes found') except Exception as e: print(f' ❌ Failed: {e}') # Test 5: Clear graph print('\n🧹 Test 5: Clearing graph...') try: result = await session.call_tool('clear_graph', {'group_id': test_group_id}) if result.content and result.content[0].text: response = result.content[0].text if 'success' in response.lower() or 'cleared' in response.lower(): print(' ✅ Graph cleared successfully') else: print(f' ✅ Clear completed: {response[:100]}') else: print(' ❌ No response') except Exception as e: print(f' ❌ Failed: {e}') print('\n' + '=' * 60) print('✅ All integration tests completed!') return True except Exception as e: print(f'\n❌ Connection failed: {e}') return False async def test_sse_transport(base_url: str = 'http://localhost:8000'): """Test MCP server with SSE transport.""" # Import the SSE client try: from mcp.client.sse import sse_client except ImportError: print('❌ SSE client not available in MCP SDK') return False test_group_id = f'test_sse_{int(time.time())}' print('🚀 Testing MCP Server with SSE transport') print(f' Server URL: {base_url}/sse') print(f' Test Group: {test_group_id}') print('=' * 60) try: # Connect to the server via SSE print('\n🔌 Connecting to server...') async with sse_client(f'{base_url}/sse') as (read_stream, write_stream): session = ClientSession(read_stream, write_stream) await session.initialize() print('✅ Connected successfully') # Run same tests as HTTP print('\n📋 Test 1: Listing tools...') try: result = await session.list_tools() tools = [tool.name for tool in result.tools] print(f' ✅ Found {len(tools)} tools') for tool in tools[:3]: print(f' - {tool}') except Exception as e: print(f' ❌ Failed: {e}') return False print('\n' + '=' * 60) print('✅ SSE transport test completed!') return True except Exception as e: print(f'\n❌ SSE connection failed: {e}') return False async def main(): """Run integration tests.""" # Check command line arguments if len(sys.argv) < 2: print('Usage: python test_http_integration.py <transport> [host] [port]') print(' transport: http or sse') print(' host: server host (default: localhost)') print(' port: server port (default: 8000)') sys.exit(1) transport = sys.argv[1].lower() host = sys.argv[2] if len(sys.argv) > 2 else 'localhost' port = sys.argv[3] if len(sys.argv) > 3 else '8000' base_url = f'http://{host}:{port}' # Check if server is running import httpx try: async with httpx.AsyncClient() as client: # Try to connect to the server await client.get(base_url, timeout=2.0) except Exception: # noqa: E722 print(f'⚠️ Server not responding at {base_url}') print('Please start the server with one of these commands:') print(f' uv run main.py --transport http --port {port}') print(f' uv run main.py --transport sse --port {port}') sys.exit(1) # Run the appropriate test if transport == 'http': success = await test_http_transport(base_url) elif transport == 'sse': success = await test_sse_transport(base_url) else: print(f'❌ Unknown transport: {transport}') sys.exit(1) sys.exit(0 if success else 1) if __name__ == '__main__': asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/getzep/graphiti'

If you have feedback or need assistance with the MCP directory API, please join our Discord server