Skip to main content
Glama
GreptimeTeam

GreptimeDB MCP Server

Official
by GreptimeTeam
test_http_transport.py8.83 kB
"""Blackbox tests for HTTP transport modes (streamable-http and sse).""" import asyncio import json import pytest import socket from contextlib import closing from unittest.mock import patch import httpx def find_free_port() -> int: """Find a free port on localhost.""" with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] @pytest.fixture def free_port(): """Get a free port for testing.""" return find_free_port() @pytest.fixture def mock_db_connection(): """Mock database connection for testing.""" with patch("greptimedb_mcp_server.server.connect") as mock_connect: mock_conn = mock_connect.return_value.__enter__.return_value mock_cursor = mock_conn.cursor.return_value.__enter__.return_value mock_cursor.fetchone.return_value = ("GreptimeDB 0.9.0",) yield mock_connect class TestStreamableHttpTransport: """Tests for streamable-http transport mode.""" @pytest.mark.asyncio async def test_initialize_returns_valid_mcp_response( self, free_port, mock_db_connection ): """Test that initialize request returns valid MCP protocol response.""" from mcp.server.fastmcp import FastMCP test_mcp = FastMCP("test_server", host="127.0.0.1", port=free_port) @test_mcp.tool() def ping() -> str: return "pong" async def run_server(): await test_mcp.run_streamable_http_async() server_task = asyncio.create_task(run_server()) await asyncio.sleep(0.5) try: async with httpx.AsyncClient(trust_env=False) as client: response = await client.post( f"http://127.0.0.1:{free_port}/mcp", json={ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test", "version": "1.0"}, }, }, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, timeout=5.0, ) assert response.status_code == 200 # Response is SSE format: "event: message\r\ndata: {...}\r\n" # Extract JSON from the data line content = response.text data = None for line in content.split("\n"): if line.startswith("data:"): data = json.loads(line[5:].strip()) break assert data is not None, "No data line found in SSE response" assert data.get("jsonrpc") == "2.0" assert data.get("id") == 1 assert "result" in data # Verify MCP initialize response fields result = data["result"] assert "protocolVersion" in result assert "capabilities" in result assert "serverInfo" in result assert result["serverInfo"]["name"] == "test_server" finally: server_task.cancel() try: await server_task except asyncio.CancelledError: pass @pytest.mark.asyncio async def test_mcp_endpoint_rejects_invalid_json( self, free_port, mock_db_connection ): """Test that /mcp endpoint returns error for invalid JSON.""" from mcp.server.fastmcp import FastMCP test_mcp = FastMCP("test_server", host="127.0.0.1", port=free_port) async def run_server(): await test_mcp.run_streamable_http_async() server_task = asyncio.create_task(run_server()) await asyncio.sleep(0.5) try: async with httpx.AsyncClient(trust_env=False) as client: response = await client.post( f"http://127.0.0.1:{free_port}/mcp", content=b"not valid json", headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, timeout=5.0, ) # Should return error status for invalid request assert response.status_code in [400, 422, 500] finally: server_task.cancel() try: await server_task except asyncio.CancelledError: pass class TestSseTransport: """Tests for SSE transport mode.""" @pytest.mark.asyncio async def test_sse_endpoint_returns_endpoint_event( self, free_port, mock_db_connection ): """Test that /sse endpoint returns SSE event with messages endpoint.""" from mcp.server.fastmcp import FastMCP test_mcp = FastMCP("test_server", host="127.0.0.1", port=free_port) async def run_server(): await test_mcp.run_sse_async() server_task = asyncio.create_task(run_server()) await asyncio.sleep(0.5) try: async with httpx.AsyncClient(trust_env=False) as client: async with client.stream( "GET", f"http://127.0.0.1:{free_port}/sse", timeout=2.0 ) as response: assert response.status_code == 200 assert "text/event-stream" in response.headers.get( "content-type", "" ) # Read first SSE event (endpoint announcement) event_data = "" async for line in response.aiter_lines(): if line.startswith("data:"): event_data = line[5:].strip() break # Verify endpoint URL is provided assert "/messages/" in event_data except httpx.ReadTimeout: pass # SSE stream stays open, timeout is expected finally: server_task.cancel() try: await server_task except asyncio.CancelledError: pass @pytest.mark.asyncio async def test_messages_endpoint_rejects_invalid_session( self, free_port, mock_db_connection ): """Test that /messages/ endpoint rejects requests without valid session.""" from mcp.server.fastmcp import FastMCP test_mcp = FastMCP("test_server", host="127.0.0.1", port=free_port) async def run_server(): await test_mcp.run_sse_async() server_task = asyncio.create_task(run_server()) await asyncio.sleep(0.5) try: async with httpx.AsyncClient(trust_env=False) as client: response = await client.post( f"http://127.0.0.1:{free_port}/messages/", json={"jsonrpc": "2.0", "id": 1, "method": "ping"}, timeout=5.0, ) # Without valid session ID, should return error assert response.status_code in [400, 404, 500] finally: server_task.cancel() try: await server_task except asyncio.CancelledError: pass class TestTransportConfig: """Tests for transport configuration.""" def test_config_transport_choices(self): """Test that transport config only accepts valid choices.""" from greptimedb_mcp_server.config import Config import sys # Valid transports should work for transport in ["stdio", "sse", "streamable-http"]: with patch.dict("os.environ", {}, clear=True): with patch.object( sys, "argv", ["test", "--transport", transport], ): config = Config.from_env_arguments() assert config.transport == transport def test_config_invalid_transport_rejected(self): """Test that invalid transport is rejected.""" import sys with patch.dict("os.environ", {}, clear=True): with patch.object(sys, "argv", ["test", "--transport", "invalid"]): with pytest.raises(SystemExit): from greptimedb_mcp_server.config import Config Config.from_env_arguments()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GreptimeTeam/greptimedb-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server