Skip to main content
Glama

OpenAPI to Model Context Protocol (MCP)

#!/usr/bin/env python3 """ Test script to validate SSE (Server-Sent Events) functionality. """ import os import sys import logging import asyncio import httpx import time # Add src to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) import server async def test_sse_functionality(): """Test SSE functionality.""" print("Testing SSE (Server-Sent Events) Functionality") print("=" * 50) # Set up logging logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') try: # Test 1: SSE Configuration print("\n1. Testing SSE Configuration") os.environ.clear() os.environ.update({ 'OPENAPI_URL': 'https://petstore3.swagger.io/api/v3/openapi.json', 'SERVER_NAME': 'petstore_sse', 'SSE_ENABLED': 'true', 'SSE_HOST': '127.0.0.1', 'SSE_PORT': '8001' }) config = server.ServerConfig() print(f"✓ SSE Configuration:") print(f" - Enabled: {config.sse_enabled}") print(f" - Host: {config.sse_host}") print(f" - Port: {config.sse_port}") # Test 2: Server with SSE Support print("\n2. Testing Server with SSE Support") srv = server.MCPServer(config) print(f"✓ SSE Manager created: {srv.sse_manager is not None}") print(f"✓ SSE Server Manager created: {srv.sse_server_manager is not None}") print(f"✓ SSE Tool Factory created: {srv.sse_tool_factory is not None}") # Test 3: Initialize and Register Tools print("\n3. Testing Tool Registration with SSE") srv.initialize() api_tools = srv.register_openapi_tools() srv.register_standard_tools() print(f"✓ API tools registered: {api_tools}") print(f"✓ Total tools: {len(srv.registered_tools)}") # Check for SSE-specific tools sse_tools = [name for name in srv.registered_tools.keys() if 'sse' in name.lower()] print(f"✓ SSE-specific tools: {len(sse_tools)}") for tool in sse_tools: print(f" - {tool}") # Test 4: Check Tools for Streaming Support print("\n4. Testing Streaming Support in Tools") streaming_tools = [] for tool_name, tool_data in srv.registered_tools.items(): metadata = tool_data.get('metadata', {}) if metadata.get('streaming_supported', False): streaming_tools.append(tool_name) print(f"✓ Tools with streaming support: {len(streaming_tools)}") for tool in streaming_tools[:5]: # Show first 5 print(f" - {tool}") # Test 5: SSE Manager Operations print("\n5. Testing SSE Manager Operations") if srv.sse_manager: # Create a test connection connection = srv.sse_manager.create_connection() print(f"✓ Test connection created: {connection.connection_id}") print(f"✓ Active connections: {srv.sse_manager.get_connection_count()}") # Test connection cleanup await srv.sse_manager.remove_connection(connection.connection_id) print(f"✓ Connection removed: {srv.sse_manager.get_connection_count()} remaining") # Test 6: SSE Server URLs print("\n6. Testing SSE Server URLs") if srv.sse_server_manager: health_url = srv.sse_server_manager.get_health_url() connections_url = srv.sse_server_manager.get_connections_url() print(f"✓ Health URL: {health_url}") print(f"✓ Connections URL: {connections_url}") # Test 7: Tool Metadata for Streaming print("\n7. Testing Tool Metadata Enhancement") sample_tool_name = list(srv.registered_tools.keys())[0] sample_metadata = srv.registered_tools[sample_tool_name]['metadata'] # Check for streaming parameters stream_params = [p for p in sample_metadata.get('parameters', []) if p.get('name') == 'stream'] if stream_params: print("✓ Stream parameter found in tool metadata:") print(f" - Type: {stream_params[0].get('type')}") print(f" - Description: {stream_params[0].get('description')}") print("\n" + "=" * 50) print("✅ SSE functionality tests completed successfully!") print("✅ SSE configuration and components properly initialized") print("✅ Tools enhanced with streaming support") print("✅ SSE manager operations functional") print("✅ Ready for real-time streaming responses") return True except Exception as e: print(f"\n❌ SSE test failed: {e}") import traceback traceback.print_exc() return False async def test_sse_server_startup(): """Test SSE server startup (requires actual server run).""" print("\n" + "=" * 50) print("Testing SSE Server Startup (5 second test)") print("=" * 50) try: # Configure for SSE os.environ.update({ 'OPENAPI_URL': 'https://petstore3.swagger.io/api/v3/openapi.json', 'SERVER_NAME': 'petstore_sse_test', 'SSE_ENABLED': 'true', 'SSE_HOST': '127.0.0.1', 'SSE_PORT': '8002' }) config = server.ServerConfig() srv = server.MCPServer(config) srv.initialize() srv.register_openapi_tools() srv.register_standard_tools() # Start SSE server print("Starting SSE server...") await srv.start_sse_server() print("✓ SSE server started") # Give it a moment to start await asyncio.sleep(2) # Test health endpoint try: async with httpx.AsyncClient() as client: health_url = srv.sse_server_manager.get_health_url() response = await client.get(health_url, timeout=5.0) if response.status_code == 200: health_data = response.json() print("✓ Health endpoint accessible:") print(f" - Status: {health_data.get('status')}") print(f" - Active connections: {health_data.get('active_connections')}") else: print(f"✗ Health endpoint returned {response.status_code}") except Exception as e: print(f"✗ Health endpoint test failed: {e}") # Stop SSE server print("Stopping SSE server...") await srv.stop_sse_server() print("✓ SSE server stopped") return True except Exception as e: print(f"❌ SSE server test failed: {e}") return False def main(): """Run SSE tests.""" async def run_tests(): success1 = await test_sse_functionality() success2 = await test_sse_server_startup() return success1 and success2 try: success = asyncio.run(run_tests()) sys.exit(0 if success else 1) except KeyboardInterrupt: print("\nTest interrupted by user") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gujord/OpenAPI-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server