Skip to main content
Glama

mcp-run-python

Official
by pydantic
test_a2a.py39.7 kB
import uuid import anyio import httpx import pytest from asgi_lifespan import LifespanManager from inline_snapshot import snapshot from pydantic import BaseModel from pydantic_ai import ( Agent, ModelMessage, ModelRequest, ModelResponse, TextPart as PydanticAITextPart, ThinkingPart, ToolCallPart, ToolReturnPart, UserPromptPart, ) from pydantic_ai.models.function import AgentInfo, FunctionModel from pydantic_ai.usage import RequestUsage from .conftest import IsDatetime, IsStr, try_import with try_import() as imports_successful: from fasta2a.client import A2AClient from fasta2a.schema import DataPart, FilePart, Message, TextPart from fasta2a.storage import InMemoryStorage pytestmark = [ pytest.mark.skipif(not imports_successful(), reason='fasta2a not installed'), pytest.mark.anyio, pytest.mark.vcr, ] def return_string(_: list[ModelMessage], info: AgentInfo) -> ModelResponse: assert info.output_tools is not None args_json = '{"response": ["foo", "bar"]}' return ModelResponse(parts=[ToolCallPart(info.output_tools[0].name, args_json)]) model = FunctionModel(return_string) # Define a test Pydantic model class UserProfile(BaseModel): name: str age: int email: str def return_pydantic_model(_: list[ModelMessage], info: AgentInfo) -> ModelResponse: assert info.output_tools is not None args_json = '{"name": "John Doe", "age": 30, "email": "john@example.com"}' return ModelResponse(parts=[ToolCallPart(info.output_tools[0].name, args_json)]) pydantic_model = FunctionModel(return_pydantic_model) async def test_a2a_pydantic_model_output(): """Test that Pydantic model outputs have correct metadata including JSON schema.""" agent = Agent(model=pydantic_model, output_type=UserProfile) app = agent.to_a2a() async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[TextPart(text='Get user profile', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert 'error' not in response assert 'result' in response result = response['result'] assert result['kind'] == 'task' task_id = result['id'] # Wait for completion while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': result = task['result'] break await anyio.sleep(0.1) # Check artifacts assert 'artifacts' in result assert len(result['artifacts']) == 1 artifact = result['artifacts'][0] # Verify the data assert artifact['parts'][0]['kind'] == 'data' assert artifact['parts'][0]['data'] == { 'result': {'name': 'John Doe', 'age': 30, 'email': 'john@example.com'} } metadata = artifact['parts'][0].get('metadata') assert metadata is not None assert metadata['json_schema'] == snapshot( { 'properties': { 'name': {'title': 'Name', 'type': 'string'}, 'age': {'title': 'Age', 'type': 'integer'}, 'email': {'title': 'Email', 'type': 'string'}, }, 'required': ['name', 'age', 'email'], 'title': 'UserProfile', 'type': 'object', } ) assert result.get('history') == snapshot( [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Get user profile'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ] ) async def test_a2a_runtime_error_without_lifespan(): agent = Agent(model=model, output_type=tuple[str, str]) app = agent.to_a2a() transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) with pytest.raises(RuntimeError, match='TaskManager was not properly initialized.'): await a2a_client.send_message(message=message) async def test_a2a_simple(): agent = Agent(model=model, output_type=tuple[str, str]) app = agent.to_a2a() async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert 'error' not in response assert 'result' in response result = response['result'] assert result['kind'] == 'task' assert result == snapshot( { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], } ) task_id = result['id'] while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': break await anyio.sleep(0.1) assert task == snapshot( { 'jsonrpc': '2.0', 'id': None, 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], 'artifacts': [ { 'artifact_id': IsStr(), 'name': 'result', 'parts': [ { 'metadata': {'json_schema': {'items': {}, 'type': 'array'}}, 'kind': 'data', 'data': {'result': ['foo', 'bar']}, } ], } ], }, } ) async def test_a2a_file_message_with_file(): agent = Agent(model=model, output_type=tuple[str, str]) app = agent.to_a2a() async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[ FilePart( kind='file', file={'uri': 'https://example.com/file.txt', 'mime_type': 'text/plain'}, ) ], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert 'error' not in response assert 'result' in response result = response['result'] assert result['kind'] == 'task' assert result == snapshot( { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [ { 'kind': 'file', 'file': {'mime_type': 'text/plain', 'uri': 'https://example.com/file.txt'}, } ], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], } ) task_id = result['id'] while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': break await anyio.sleep(0.1) assert task == snapshot( { 'jsonrpc': '2.0', 'id': None, 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [ { 'kind': 'file', 'file': {'mime_type': 'text/plain', 'uri': 'https://example.com/file.txt'}, } ], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], 'artifacts': [ { 'artifact_id': IsStr(), 'name': 'result', 'parts': [ { 'metadata': {'json_schema': {'items': {}, 'type': 'array'}}, 'kind': 'data', 'data': {'result': ['foo', 'bar']}, } ], } ], }, } ) async def test_a2a_file_message_with_file_content(): agent = Agent(model=model, output_type=tuple[str, str]) app = agent.to_a2a() async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[ FilePart(file={'bytes': 'foo', 'mime_type': 'text/plain'}, kind='file'), ], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert 'error' not in response assert 'result' in response result = response['result'] assert result['kind'] == 'task' assert result == snapshot( { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'file', 'file': {'bytes': 'foo', 'mime_type': 'text/plain'}}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], } ) task_id = result['id'] while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': break await anyio.sleep(0.1) assert task == snapshot( { 'jsonrpc': '2.0', 'id': None, 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'file', 'file': {'bytes': 'foo', 'mime_type': 'text/plain'}}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], 'artifacts': [ { 'artifact_id': IsStr(), 'name': 'result', 'parts': [ { 'metadata': {'json_schema': {'items': {}, 'type': 'array'}}, 'kind': 'data', 'data': {'result': ['foo', 'bar']}, } ], } ], }, } ) async def test_a2a_file_message_with_data(): agent = Agent(model=model, output_type=tuple[str, str]) app = agent.to_a2a() async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[DataPart(kind='data', data={'foo': 'bar'})], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert 'error' not in response assert 'result' in response result = response['result'] assert result['kind'] == 'task' assert result == snapshot( { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'data', 'data': {'foo': 'bar'}}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], } ) task_id = result['id'] while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'failed': break await anyio.sleep(0.1) assert task == snapshot( { 'jsonrpc': '2.0', 'id': None, 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'failed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'data', 'data': {'foo': 'bar'}}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], }, } ) async def test_a2a_error_handling(): """Test that errors during task execution properly update task state.""" def raise_error(_: list[ModelMessage], info: AgentInfo) -> ModelResponse: raise RuntimeError('Test error during agent execution') error_model = FunctionModel(raise_error) agent = Agent(model=error_model, output_type=str) app = agent.to_a2a() async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert 'error' not in response assert 'result' in response result = response['result'] assert result['kind'] == 'task' task_id = result['id'] # Wait for task to fail await anyio.sleep(0.1) task = await a2a_client.get_task(task_id) assert 'result' in task assert task['result']['status']['state'] == 'failed' async def test_a2a_multiple_tasks_same_context(): """Test that multiple tasks can share the same context_id with accumulated history.""" messages_received: list[list[ModelMessage]] = [] def track_messages(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: # Store a copy of the messages received by the model messages_received.append(messages.copy()) # Return the standard response assert info.output_tools is not None args_json = '{"response": ["foo", "bar"]}' return ModelResponse(parts=[ToolCallPart(info.output_tools[0].name, args_json)]) tracking_model = FunctionModel(track_messages) agent = Agent(model=tracking_model, output_type=tuple[str, str]) app = agent.to_a2a() async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) # First message - should create a new context message1 = Message( role='user', parts=[TextPart(text='First message', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) response1 = await a2a_client.send_message(message=message1) assert 'error' not in response1 assert 'result' in response1 result1 = response1['result'] assert result1['kind'] == 'task' task1_id = result1['id'] context_id = result1['context_id'] # Wait for first task to complete await anyio.sleep(0.1) task1 = await a2a_client.get_task(task1_id) assert 'result' in task1 assert task1['result']['status']['state'] == 'completed' # Verify the model received at least one message assert len(messages_received) == 1 first_run_history = messages_received[0] assert first_run_history == snapshot( [ModelRequest(parts=[UserPromptPart(content='First message', timestamp=IsDatetime())])] ) # Second message - reuse the same context_id message2 = Message( role='user', parts=[TextPart(text='Second message', kind='text')], kind='message', context_id=context_id, message_id=str(uuid.uuid4()), ) response2 = await a2a_client.send_message(message=message2) assert 'error' not in response2 assert 'result' in response2 result2 = response2['result'] assert result2['kind'] == 'task' task2_id = result2['id'] # Verify we got a new task ID but same context ID assert task2_id != task1_id assert result2['context_id'] == context_id # Wait for second task to complete while task2 := await a2a_client.get_task(task2_id): # pragma: no branch if 'result' in task2 and task2['result']['status']['state'] == 'completed': break await anyio.sleep(0.1) # Verify the model received the full history on the second call assert len(messages_received) == 2 second_run_history = messages_received[1] assert second_run_history[0] == first_run_history[0] assert second_run_history == snapshot( [ ModelRequest(parts=[UserPromptPart(content='First message', timestamp=IsDatetime())]), ModelResponse( parts=[ ToolCallPart( tool_name='final_result', args='{"response": ["foo", "bar"]}', tool_call_id=IsStr() ) ], usage=RequestUsage(input_tokens=52, output_tokens=7), model_name='function:track_messages:', timestamp=IsDatetime(), ), ModelRequest( parts=[ ToolReturnPart( tool_name='final_result', content='Final result processed.', tool_call_id=IsStr(), timestamp=IsDatetime(), ), UserPromptPart(content='Second message', timestamp=IsDatetime()), ], ), ] ) async def test_a2a_thinking_response(): """Test that ModelResponse messages with ThinkingPart are properly handled.""" def return_thinking_response(_: list[ModelMessage], info: AgentInfo) -> ModelResponse: assert info.output_tools is not None # Create a response with thinking part and text part return ModelResponse( parts=[ ThinkingPart(content='Let me think about this...', id='thinking_1'), PydanticAITextPart(content="Here's my response"), ] ) thinking_model = FunctionModel(return_thinking_response) agent = Agent(model=thinking_model, output_type=str) app = agent.to_a2a() async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert 'error' not in response assert 'result' in response result = response['result'] assert result['kind'] == 'task' task_id = result['id'] # Wait for completion await anyio.sleep(0.1) task = await a2a_client.get_task(task_id) assert 'result' in task assert task['result'] == snapshot( { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), }, { 'role': 'agent', 'parts': [ { 'metadata': {'type': 'thinking', 'thinking_id': 'thinking_1', 'signature': None}, 'kind': 'text', 'text': 'Let me think about this...', }, {'kind': 'text', 'text': "Here's my response"}, ], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), }, ], 'artifacts': [ { 'artifact_id': IsStr(), 'name': 'result', 'parts': [{'kind': 'text', 'text': "Here's my response"}], } ], } ) async def test_a2a_multiple_messages(): agent = Agent(model=model, output_type=tuple[str, str]) storage = InMemoryStorage() app = agent.to_a2a(storage=storage) async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert response == snapshot( { 'jsonrpc': '2.0', 'id': IsStr(), 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], }, } ) # NOTE: We include the agent history before we start working on the task. assert 'result' in response result = response['result'] assert result['kind'] == 'task' task_id = result['id'] task = storage.tasks[task_id] assert 'history' in task task['history'].append( Message( role='agent', parts=[TextPart(text='Whats up?', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) ) response = await a2a_client.get_task(task_id) assert response == snapshot( { 'jsonrpc': '2.0', 'id': None, 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), }, { 'role': 'agent', 'parts': [{'kind': 'text', 'text': 'Whats up?'}], 'kind': 'message', 'message_id': IsStr(), }, ], }, } ) while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': break await anyio.sleep(0.1) assert task == snapshot( { 'jsonrpc': '2.0', 'id': None, 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), }, { 'role': 'agent', 'parts': [{'kind': 'text', 'text': 'Whats up?'}], 'kind': 'message', 'message_id': IsStr(), }, ], 'artifacts': [ { 'artifact_id': IsStr(), 'name': 'result', 'parts': [ { 'metadata': {'json_schema': {'items': {}, 'type': 'array'}}, 'kind': 'data', 'data': {'result': ['foo', 'bar']}, } ], } ], }, } ) async def test_a2a_multiple_send_task_messages(): agent = Agent(model=model, output_type=tuple[str, str]) storage = InMemoryStorage() app = agent.to_a2a(storage=storage) async with LifespanManager(app): transport = httpx.ASGITransport(app) async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) message = Message( role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message', message_id=str(uuid.uuid4()), ) response = await a2a_client.send_message(message=message) assert response == snapshot( { 'jsonrpc': '2.0', 'id': IsStr(), 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], }, } ) assert 'result' in response result = response['result'] assert result['kind'] == 'task' task_id = result['id'] context_id = result['context_id'] while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': result = task['result'] break await anyio.sleep(0.1) assert result == snapshot( { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], 'artifacts': [ { 'artifact_id': IsStr(), 'name': 'result', 'parts': [ { 'metadata': {'json_schema': {'items': {}, 'type': 'array'}}, 'kind': 'data', 'data': {'result': ['foo', 'bar']}, } ], } ], } ) message = Message( role='user', parts=[TextPart(text='Did you get my first message?', kind='text')], kind='message', message_id=str(uuid.uuid4()), context_id=context_id, ) response = await a2a_client.send_message(message=message) assert response == snapshot( { 'jsonrpc': '2.0', 'id': IsStr(), 'result': { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Did you get my first message?'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], }, } ) while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': result = task['result'] break await anyio.sleep(0.1) # pragma: lax no cover assert result == snapshot( { 'id': IsStr(), 'context_id': IsStr(), 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], 'kind': 'message', 'message_id': IsStr(), 'context_id': IsStr(), 'task_id': IsStr(), } ], 'artifacts': [ { 'artifact_id': IsStr(), 'name': 'result', 'parts': [ { 'metadata': {'json_schema': {'items': {}, 'type': 'array'}}, 'kind': 'data', 'data': {'result': ['foo', 'bar']}, } ], } ], } )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pydantic/pydantic-ai'

If you have feedback or need assistance with the MCP directory API, please join our Discord server