Skip to main content
Glama

OpenAPI to Model Context Protocol (MCP)

#!/usr/bin/env python3 """ Test script for running FastMCP server live with SSE. """ import os import sys import logging import asyncio import httpx # Add src to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) async def test_fastmcp_live(): """Test FastMCP server running live with SSE.""" print("Testing FastMCP Live SSE Server") print("=" * 40) # Set up logging logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') try: # Test 1: Import and create server print("\n1. Creating FastMCP Server") from fastmcp_server import FastMCPOpenAPIServer from config import ServerConfig os.environ.clear() os.environ.update({ 'OPENAPI_URL': 'https://petstore3.swagger.io/api/v3/openapi.json', 'SERVER_NAME': 'petstore_fastmcp_live', 'MCP_HTTP_ENABLED': 'true', 'MCP_HTTP_HOST': '127.0.0.1', 'MCP_HTTP_PORT': '8004' }) config = ServerConfig() server = FastMCPOpenAPIServer(config) await server.initialize() print("✓ FastMCP server initialized") # Test 2: Start SSE server in background print("\n2. Starting SSE Server") async def run_server(): """Run the SSE server.""" await server.run_sse_async(host=config.mcp_http_host, port=config.mcp_http_port) # Start server as background task server_task = asyncio.create_task(run_server()) # Give server time to start await asyncio.sleep(3) print("✓ SSE server started") # Test 3: Test SSE endpoint print("\n3. Testing SSE Endpoint") async with httpx.AsyncClient() as client: base_url = f"http://{config.mcp_http_host}:{config.mcp_http_port}" # Test if server is responding try: # FastMCP SSE server might have different endpoints # Let's try to access the root and see what's available response = await client.get(f"{base_url}/", timeout=5.0) print(f"✓ Root endpoint: {response.status_code}") except Exception as e: print(f"Root endpoint: {e}") # Try SSE endpoint try: sse_response = await client.get(f"{base_url}/sse", timeout=5.0) print(f"✓ SSE endpoint accessible: {sse_response.status_code}") except Exception as e: print(f"SSE endpoint: {e}") # Try health endpoint if available try: health_response = await client.get(f"{base_url}/health", timeout=5.0) print(f"✓ Health endpoint: {health_response.status_code}") except Exception as e: print(f"Health endpoint: {e}") # Test 4: Try short SSE connection print("\n4. Testing Short SSE Connection") try: async with httpx.AsyncClient() as client: async with client.stream("GET", f"{base_url}/sse", timeout=5.0) as response: print(f"✓ SSE stream started: {response.status_code}") # Read a few events count = 0 async for chunk in response.aiter_text(): if chunk.strip(): print(f" SSE event: {chunk.strip()[:100]}...") count += 1 if count >= 3: break print(f"✓ Received {count} SSE events") except Exception as e: print(f"SSE connection test: {e}") # Stop server print("\n5. Shutting down") server_task.cancel() try: await server_task except asyncio.CancelledError: pass print("✓ Server stopped") print("\n" + "=" * 40) print("🎉 FastMCP Live Test Complete!") print("✅ FastMCP SSE server can start and run") print("✅ Server accepts HTTP connections") print("✅ Ready for MCP clients") return True except Exception as e: print(f"\n❌ FastMCP live test failed: {e}") import traceback traceback.print_exc() return False def main(): """Run FastMCP live tests.""" try: success = asyncio.run(test_fastmcp_live()) sys.exit(0 if success else 1) except KeyboardInterrupt: print("\nTest interrupted by user") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gujord/OpenAPI-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server