Skip to main content
Glama

Continuo Memory System

by GtOkAi
test_88_random_error.py4.44 kB
"""Test to reproduce issue #88: Random error thrown on response.""" from collections.abc import Sequence from datetime import timedelta from pathlib import Path from typing import Any import anyio import pytest from anyio.abc import TaskStatus from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp.client.session import ClientSession from mcp.shared.exceptions import McpError from mcp.shared.message import SessionMessage from mcp.types import ContentBlock, TextContent from mcp import types from mcp.server.lowlevel import Server @pytest.mark.anyio async def test_notification_validation_error(tmp_path: Path): """Test that timeouts are handled gracefully and don't break the server. This test verifies that when a client request times out: 1. The server task stays alive 2. The server can still handle new requests 3. The client can make new requests 4. No resources are leaked """ server = Server(name="test") request_count = 0 slow_request_lock = anyio.Event() @server.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="slow", description="A slow tool", inputSchema={"type": "object"}, ), types.Tool( name="fast", description="A fast tool", inputSchema={"type": "object"}, ), ] @server.call_tool() async def slow_tool(name: str, arguments: dict[str, Any]) -> Sequence[ContentBlock]: nonlocal request_count request_count += 1 if name == "slow": await slow_request_lock.wait() # it should timeout here return [TextContent(type="text", text=f"slow {request_count}")] elif name == "fast": return [TextContent(type="text", text=f"fast {request_count}")] return [TextContent(type="text", text=f"unknown {request_count}")] async def server_handler( read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], write_stream: MemoryObjectSendStream[SessionMessage], task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED, ): with anyio.CancelScope() as scope: task_status.started(scope) # type: ignore await server.run( read_stream, write_stream, server.create_initialization_options(), raise_exceptions=True, ) async def client( read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], write_stream: MemoryObjectSendStream[SessionMessage], scope: anyio.CancelScope, ): # Use a timeout that's: # - Long enough for fast operations (>10ms) # - Short enough for slow operations (<200ms) # - Not too short to avoid flakiness async with ClientSession( read_stream, write_stream, read_timeout_seconds=timedelta(milliseconds=50) ) as session: await session.initialize() # First call should work (fast operation) result = await session.call_tool("fast") assert result.content == [TextContent(type="text", text="fast 1")] assert not slow_request_lock.is_set() # Second call should timeout (slow operation) with pytest.raises(McpError) as exc_info: await session.call_tool("slow") assert "Timed out while waiting" in str(exc_info.value) # release the slow request not to have hanging process slow_request_lock.set() # Third call should work (fast operation), # proving server is still responsive result = await session.call_tool("fast") assert result.content == [TextContent(type="text", text="fast 3")] scope.cancel() # Run server and client in separate task groups to avoid cancellation server_writer, server_reader = anyio.create_memory_object_stream[SessionMessage](1) client_writer, client_reader = anyio.create_memory_object_stream[SessionMessage](1) async with anyio.create_task_group() as tg: scope = await tg.start(server_handler, server_reader, client_writer) # Run client in a separate task to avoid cancellation tg.start_soon(client, client_reader, server_writer, scope)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GtOkAi/continuo-memory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server