Skip to main content
Glama
test_concurrent_writes.py19 kB
"""Tests for concurrent/parallel operations in MCP Agent Mail. Tests concurrent access patterns including: - Parallel message writes - Concurrent file reservation requests - Race conditions in lock acquisition - Parallel inbox fetches during message delivery """ from __future__ import annotations import asyncio import pytest from fastmcp import Client from sqlalchemy import text from mcp_agent_mail import config as _config from mcp_agent_mail.app import build_mcp_server from mcp_agent_mail.db import ensure_schema, get_session async def _setup_project_and_agents(settings: _config.Settings) -> dict: """Create test project and agents using MCP tools.""" await ensure_schema() server = build_mcp_server() async with Client(server) as client: # Create project via MCP tool await client.call_tool("ensure_project", {"human_key": "/tmp/concurrent-test"}) # Create multiple agents with auto-generated adjective+noun names agents = [] for i in range(5): result = await client.call_tool( "register_agent", { "project_key": "/tmp/concurrent-test", "program": "claude-code", "model": "opus-4", "task_description": f"Task {i}", }, ) # Extract the generated name from the result data = result.data if hasattr(result, "data") else {} if isinstance(data, dict) and "name" in data: agents.append(data["name"]) # Ensure we got all agents - fail fast if not assert len(agents) == 5, f"Expected 5 agents, got {len(agents)}: {agents}" # Get project_id from DB for reference async with get_session() as session: row = await session.execute( text("SELECT id FROM projects WHERE human_key = :hk"), {"hk": "/tmp/concurrent-test"}, ) project_id = row.scalar() return { "project_id": project_id, "project_slug": "tmp-concurrent-test", "agents": agents, } # ============================================================================= # Concurrent Message Write Tests # ============================================================================= @pytest.mark.asyncio async def test_concurrent_message_sends(isolated_env): """Test sending multiple messages concurrently.""" settings = _config.get_settings() data = await _setup_project_and_agents(settings) server = build_mcp_server() async def send_message(client: Client, sender: str, recipient: str, subject: str): """Send a message via the MCP tool.""" result = await client.call_tool( "send_message", { "project_key": "/tmp/concurrent-test", "sender_name": sender, "to": [recipient], "subject": subject, "body_md": f"Message from {sender} to {recipient}", }, ) return result # Send 10 messages concurrently async with Client(server) as client: tasks = [] for i in range(10): sender = data["agents"][i % 5] recipient = data["agents"][(i + 1) % 5] tasks.append(send_message(client, sender, recipient, f"Concurrent Message {i}")) results = await asyncio.gather(*tasks, return_exceptions=True) # Count successes (some may fail due to auto-registration issues, that's ok) successes = sum(1 for r in results if not isinstance(r, Exception)) assert successes >= 5, f"Expected at least 5 successful sends, got {successes}" @pytest.mark.asyncio async def test_concurrent_messages_to_same_thread(isolated_env): """Test multiple agents writing to the same thread concurrently.""" settings = _config.get_settings() data = await _setup_project_and_agents(settings) server = build_mcp_server() first_agent = data["agents"][0] async def send_to_thread(client: Client, sender: str, message_num: int): """Send a message to a shared thread.""" result = await client.call_tool( "send_message", { "project_key": "/tmp/concurrent-test", "sender_name": sender, "to": [first_agent], "subject": f"Thread Message {message_num}", "body_md": f"Message {message_num} from {sender}", "thread_id": "shared-thread-1", }, ) return result # Send 5 messages to same thread concurrently async with Client(server) as client: tasks = [send_to_thread(client, data["agents"][i], i) for i in range(5)] results = await asyncio.gather(*tasks, return_exceptions=True) # All should succeed or fail gracefully errors = [r for r in results if isinstance(r, Exception)] assert len(errors) < 3, f"Too many errors: {errors}" # ============================================================================= # Concurrent File Reservation Tests # ============================================================================= @pytest.mark.asyncio async def test_concurrent_file_reservation_different_paths(isolated_env): """Test concurrent file reservations on different paths.""" settings = _config.get_settings() data = await _setup_project_and_agents(settings) server = build_mcp_server() async def reserve_file(client: Client, agent: str, path: str): """Reserve a file path.""" result = await client.call_tool( "file_reservation_paths", { "project_key": "/tmp/concurrent-test", "agent_name": agent, "paths": [path], "ttl_seconds": 3600, "exclusive": True, "reason": f"Testing by {agent}", }, ) return result # Reserve different paths concurrently async with Client(server) as client: tasks = [reserve_file(client, data["agents"][i], f"src/module{i}.py") for i in range(5)] results = await asyncio.gather(*tasks, return_exceptions=True) # All should succeed (no conflicts) successes = sum(1 for r in results if not isinstance(r, Exception)) assert successes == 5, f"Expected all 5 reservations to succeed, got {successes}" @pytest.mark.asyncio async def test_concurrent_file_reservation_same_path_conflict(isolated_env): """Test concurrent file reservations on the same path detect conflicts.""" settings = _config.get_settings() data = await _setup_project_and_agents(settings) server = build_mcp_server() async def reserve_file(client: Client, agent: str): """Reserve the same file path.""" result = await client.call_tool( "file_reservation_paths", { "project_key": "/tmp/concurrent-test", "agent_name": agent, "paths": ["shared/config.json"], "ttl_seconds": 3600, "exclusive": True, "reason": f"Testing by {agent}", }, ) return result # Try to reserve the same path concurrently async with Client(server) as client: tasks = [reserve_file(client, data["agents"][i]) for i in range(3)] results = await asyncio.gather(*tasks, return_exceptions=True) # At least one should succeed, others may report conflicts successes = sum(1 for r in results if not isinstance(r, Exception)) assert successes >= 1, "At least one reservation should succeed" @pytest.mark.asyncio async def test_concurrent_file_reservation_overlapping_globs(isolated_env): """Test concurrent reservations with overlapping glob patterns.""" settings = _config.get_settings() data = await _setup_project_and_agents(settings) server = build_mcp_server() async with Client(server) as client: # First agent reserves broad pattern result1 = await client.call_tool( "file_reservation_paths", { "project_key": "/tmp/concurrent-test", "agent_name": data["agents"][0], "paths": ["src/**/*.py"], "ttl_seconds": 3600, "exclusive": True, }, ) # Second agent tries to reserve specific file in same pattern result2 = await client.call_tool( "file_reservation_paths", { "project_key": "/tmp/concurrent-test", "agent_name": data["agents"][1], "paths": ["src/app.py"], "ttl_seconds": 3600, "exclusive": True, }, ) # Second should report conflict with first # Result format varies, just check it doesn't crash assert result1 is not None assert result2 is not None # ============================================================================= # Concurrent Inbox Fetch Tests # ============================================================================= @pytest.mark.asyncio async def test_concurrent_inbox_fetches(isolated_env): """Test multiple concurrent inbox fetches.""" settings = _config.get_settings() data = await _setup_project_and_agents(settings) # First send some messages server = build_mcp_server() async with Client(server) as client: for i in range(5): await client.call_tool( "send_message", { "project_key": "/tmp/concurrent-test", "sender_name": data["agents"][(i + 1) % 5], "to": [data["agents"][0]], "subject": f"Test Message {i}", "body_md": f"Body {i}", }, ) async def fetch_inbox(c: Client): """Fetch inbox for Agent0.""" result = await c.call_tool( "fetch_inbox", { "project_key": "/tmp/concurrent-test", "agent_name": data["agents"][0], "limit": 100, }, ) return result # Fetch inbox concurrently 10 times tasks = [fetch_inbox(client) for _ in range(10)] results = await asyncio.gather(*tasks, return_exceptions=True) # All should succeed successes = sum(1 for r in results if not isinstance(r, Exception)) assert successes == 10, f"All inbox fetches should succeed, got {successes}" @pytest.mark.asyncio async def test_concurrent_inbox_fetch_during_message_send(isolated_env): """Test inbox fetch while messages are being sent.""" settings = _config.get_settings() data = await _setup_project_and_agents(settings) server = build_mcp_server() async def send_message(client: Client, i: int): """Send a message.""" await asyncio.sleep(0.01 * i) # Slight stagger return await client.call_tool( "send_message", { "project_key": "/tmp/concurrent-test", "sender_name": data["agents"][1], "to": [data["agents"][0]], "subject": f"Concurrent Send {i}", "body_md": f"Body {i}", }, ) async def fetch_inbox(client: Client): """Fetch inbox.""" await asyncio.sleep(0.05) # Slight delay return await client.call_tool( "fetch_inbox", { "project_key": "/tmp/concurrent-test", "agent_name": data["agents"][0], "limit": 100, }, ) # Run sends and fetches concurrently async with Client(server) as client: send_tasks = [send_message(client, i) for i in range(5)] fetch_tasks = [fetch_inbox(client) for _ in range(3)] results = await asyncio.gather(*send_tasks, *fetch_tasks, return_exceptions=True) # Should not crash errors = [r for r in results if isinstance(r, Exception)] assert len(errors) < len(results), "Some operations should succeed" # ============================================================================= # Lock Race Condition Tests # ============================================================================= @pytest.mark.asyncio async def test_concurrent_project_ensure(isolated_env): """Test concurrent project ensure calls.""" _config.get_settings() # Ensure settings are loaded await ensure_schema() server = build_mcp_server() async def ensure_project(client: Client, suffix: str): """Ensure a project exists.""" return await client.call_tool( "ensure_project", {"human_key": f"/tmp/race-test-{suffix}"}, ) # Call ensure_project concurrently for same project async with Client(server) as client: tasks = [ensure_project(client, "same") for _ in range(5)] results = await asyncio.gather(*tasks, return_exceptions=True) # All should succeed (idempotent operation) successes = sum(1 for r in results if not isinstance(r, Exception)) assert successes >= 4, f"Most ensures should succeed, got {successes}" @pytest.mark.asyncio async def test_concurrent_agent_registration(isolated_env): """Test concurrent agent registration.""" _config.get_settings() # Ensure settings are loaded await ensure_schema() server = build_mcp_server() async with Client(server) as client: # First ensure project await client.call_tool("ensure_project", {"human_key": "/tmp/reg-test"}) async def register_agent(c: Client, i: int): """Register an agent.""" return await c.call_tool( "register_agent", { "project_key": "/tmp/reg-test", "program": "claude-code", "model": "opus-4", "task_description": f"Task {i}", }, ) # Register multiple agents concurrently tasks = [register_agent(client, i) for i in range(10)] results = await asyncio.gather(*tasks, return_exceptions=True) # All should succeed with unique names successes = [r for r in results if not isinstance(r, Exception)] assert len(successes) >= 8, f"Most registrations should succeed, got {len(successes)}" # Verify unique names names = set() for r in successes: data = getattr(r, "data", None) if isinstance(data, dict) and "name" in data: names.add(data["name"]) # Names should be unique (if we could extract them) # ============================================================================= # Database Concurrent Access Tests # ============================================================================= @pytest.mark.asyncio async def test_concurrent_message_read_write(isolated_env): """Test concurrent reads and writes to messages table.""" _config.get_settings() # Ensure settings are loaded await ensure_schema() async with get_session() as session: await session.execute( text("INSERT INTO projects (slug, human_key, created_at) VALUES (:slug, :hk, datetime('now'))"), {"slug": "db-concurrent", "hk": "/tmp/db-concurrent"}, ) await session.commit() row = await session.execute(text("SELECT id FROM projects WHERE slug = :slug"), {"slug": "db-concurrent"}) project_id = row.scalar() async def write_message(i: int) -> None: """Write a message.""" async with get_session() as session: await session.execute( text( "INSERT INTO messages (project_id, subject, body_md, importance, ack_required, sender_id, created_ts) " "VALUES (:pid, :subj, :body, :imp, :ack, 1, datetime('now'))" ), {"pid": project_id, "subj": f"Msg {i}", "body": f"Body {i}", "imp": "normal", "ack": 0}, ) await session.commit() async def read_messages() -> int: """Read message count.""" async with get_session() as session: row = await session.execute( text("SELECT COUNT(*) FROM messages WHERE project_id = :pid"), {"pid": project_id}, ) return row.scalar() or 0 # Mix writes and reads write_tasks = [write_message(i) for i in range(10)] read_tasks = [read_messages() for _ in range(5)] results = await asyncio.gather(*write_tasks, *read_tasks, return_exceptions=True) # Should not crash errors = [r for r in results if isinstance(r, Exception)] assert len(errors) == 0, f"No errors expected: {errors}" # ============================================================================= # Archive Lock Tests # ============================================================================= @pytest.mark.asyncio async def test_concurrent_archive_writes(isolated_env): """Test concurrent writes to the same archive.""" settings = _config.get_settings() from mcp_agent_mail.storage import ensure_archive, write_agent_profile archive = await ensure_archive(settings, "archive-lock-test") async def write_profile(i: int) -> None: """Write an agent profile.""" await write_agent_profile( archive, { "name": f"Agent{i}", "program": "claude-code", "model": "opus-4", "task_description": f"Task {i}", }, ) # Write profiles concurrently tasks = [write_profile(i) for i in range(5)] results = await asyncio.gather(*tasks, return_exceptions=True) # Should handle concurrent access errors = [r for r in results if isinstance(r, Exception)] assert len(errors) < 3, f"Most writes should succeed: {errors}" @pytest.mark.asyncio async def test_concurrent_message_bundle_writes(isolated_env): """Test concurrent message bundle writes to archive.""" settings = _config.get_settings() from mcp_agent_mail.storage import ensure_archive, write_message_bundle archive = await ensure_archive(settings, "bundle-lock-test") async def write_bundle(i: int) -> None: """Write a message bundle.""" await write_message_bundle( archive, message={"id": i, "subject": f"Subject {i}"}, body_md=f"Body {i}", sender=f"Sender{i}", recipients=[f"Recipient{i}"], ) # Write bundles concurrently tasks = [write_bundle(i) for i in range(10)] results = await asyncio.gather(*tasks, return_exceptions=True) # Should handle concurrent access (archive lock) errors = [r for r in results if isinstance(r, Exception)] assert len(errors) < 5, f"Most writes should succeed: {errors}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dicklesworthstone/mcp_agent_mail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server