Skip to main content
Glama

MCP Python SDK

by imax09-wq
test_lifespan.py6.98 kB
"""Tests for lifespan functionality in both low-level and FastMCP servers.""" from collections.abc import AsyncIterator from contextlib import asynccontextmanager import anyio import pytest from pydantic import TypeAdapter from mcp.server.fastmcp import Context, FastMCP from mcp.server.lowlevel.server import NotificationOptions, Server from mcp.server.models import InitializationOptions from mcp.types import ( ClientCapabilities, Implementation, InitializeRequestParams, JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, ) @pytest.mark.anyio async def test_lowlevel_server_lifespan(): """Test that lifespan works in low-level server.""" @asynccontextmanager async def test_lifespan(server: Server) -> AsyncIterator[dict[str, bool]]: """Test lifespan context that tracks startup/shutdown.""" context = {"started": False, "shutdown": False} try: context["started"] = True yield context finally: context["shutdown"] = True server = Server("test", lifespan=test_lifespan) # Create memory streams for testing send_stream1, receive_stream1 = anyio.create_memory_object_stream(100) send_stream2, receive_stream2 = anyio.create_memory_object_stream(100) # Create a tool that accesses lifespan context @server.call_tool() async def check_lifespan(name: str, arguments: dict) -> list: ctx = server.request_context assert isinstance(ctx.lifespan_context, dict) assert ctx.lifespan_context["started"] assert not ctx.lifespan_context["shutdown"] return [{"type": "text", "text": "true"}] # Run server in background task async with ( anyio.create_task_group() as tg, send_stream1, receive_stream1, send_stream2, receive_stream2, ): async def run_server(): await server.run( receive_stream1, send_stream2, InitializationOptions( server_name="test", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), raise_exceptions=True, ) tg.start_soon(run_server) # Initialize the server params = InitializeRequestParams( protocolVersion="2024-11-05", capabilities=ClientCapabilities(), clientInfo=Implementation(name="test-client", version="0.1.0"), ) await send_stream1.send( JSONRPCMessage( root=JSONRPCRequest( jsonrpc="2.0", id=1, method="initialize", params=TypeAdapter(InitializeRequestParams).dump_python(params), ) ) ) response = await receive_stream2.receive() # Send initialized notification await send_stream1.send( JSONRPCMessage( root=JSONRPCNotification( jsonrpc="2.0", method="notifications/initialized", ) ) ) # Call the tool to verify lifespan context await send_stream1.send( JSONRPCMessage( root=JSONRPCRequest( jsonrpc="2.0", id=2, method="tools/call", params={"name": "check_lifespan", "arguments": {}}, ) ) ) # Get response and verify response = await receive_stream2.receive() assert response.root.result["content"][0]["text"] == "true" # Cancel server task tg.cancel_scope.cancel() @pytest.mark.anyio async def test_fastmcp_server_lifespan(): """Test that lifespan works in FastMCP server.""" @asynccontextmanager async def test_lifespan(server: FastMCP) -> AsyncIterator[dict]: """Test lifespan context that tracks startup/shutdown.""" context = {"started": False, "shutdown": False} try: context["started"] = True yield context finally: context["shutdown"] = True server = FastMCP("test", lifespan=test_lifespan) # Create memory streams for testing send_stream1, receive_stream1 = anyio.create_memory_object_stream(100) send_stream2, receive_stream2 = anyio.create_memory_object_stream(100) # Add a tool that checks lifespan context @server.tool() def check_lifespan(ctx: Context) -> bool: """Tool that checks lifespan context.""" assert isinstance(ctx.request_context.lifespan_context, dict) assert ctx.request_context.lifespan_context["started"] assert not ctx.request_context.lifespan_context["shutdown"] return True # Run server in background task async with ( anyio.create_task_group() as tg, send_stream1, receive_stream1, send_stream2, receive_stream2, ): async def run_server(): await server._mcp_server.run( receive_stream1, send_stream2, server._mcp_server.create_initialization_options(), raise_exceptions=True, ) tg.start_soon(run_server) # Initialize the server params = InitializeRequestParams( protocolVersion="2024-11-05", capabilities=ClientCapabilities(), clientInfo=Implementation(name="test-client", version="0.1.0"), ) await send_stream1.send( JSONRPCMessage( root=JSONRPCRequest( jsonrpc="2.0", id=1, method="initialize", params=TypeAdapter(InitializeRequestParams).dump_python(params), ) ) ) response = await receive_stream2.receive() # Send initialized notification await send_stream1.send( JSONRPCMessage( root=JSONRPCNotification( jsonrpc="2.0", method="notifications/initialized", ) ) ) # Call the tool to verify lifespan context await send_stream1.send( JSONRPCMessage( root=JSONRPCRequest( jsonrpc="2.0", id=2, method="tools/call", params={"name": "check_lifespan", "arguments": {}}, ) ) ) # Get response and verify response = await receive_stream2.receive() assert response.root.result["content"][0]["text"] == "true" # Cancel server task tg.cancel_scope.cancel()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/imax09-wq/mcp-py'

If you have feedback or need assistance with the MCP directory API, please join our Discord server