"""Tests for concurrent/parallel operations in MCP Agent Mail.
Tests concurrent access patterns including:
- Parallel message writes
- Concurrent file reservation requests
- Race conditions in lock acquisition
- Parallel inbox fetches during message delivery
"""
from __future__ import annotations
import asyncio
import pytest
from fastmcp import Client
from sqlalchemy import text
from mcp_agent_mail import config as _config
from mcp_agent_mail.app import build_mcp_server
from mcp_agent_mail.db import ensure_schema, get_session
async def _setup_project_and_agents(settings: _config.Settings) -> dict:
"""Create test project and agents using MCP tools."""
await ensure_schema()
server = build_mcp_server()
async with Client(server) as client:
# Create project via MCP tool
await client.call_tool("ensure_project", {"human_key": "/tmp/concurrent-test"})
# Create multiple agents with auto-generated adjective+noun names
agents = []
for i in range(5):
result = await client.call_tool(
"register_agent",
{
"project_key": "/tmp/concurrent-test",
"program": "claude-code",
"model": "opus-4",
"task_description": f"Task {i}",
},
)
# Extract the generated name from the result
data = result.data if hasattr(result, "data") else {}
if isinstance(data, dict) and "name" in data:
agents.append(data["name"])
# Ensure we got all agents - fail fast if not
assert len(agents) == 5, f"Expected 5 agents, got {len(agents)}: {agents}"
# Get project_id from DB for reference
async with get_session() as session:
row = await session.execute(
text("SELECT id FROM projects WHERE human_key = :hk"),
{"hk": "/tmp/concurrent-test"},
)
project_id = row.scalar()
return {
"project_id": project_id,
"project_slug": "tmp-concurrent-test",
"agents": agents,
}
# =============================================================================
# Concurrent Message Write Tests
# =============================================================================
@pytest.mark.asyncio
async def test_concurrent_message_sends(isolated_env):
"""Test sending multiple messages concurrently."""
settings = _config.get_settings()
data = await _setup_project_and_agents(settings)
server = build_mcp_server()
async def send_message(client: Client, sender: str, recipient: str, subject: str):
"""Send a message via the MCP tool."""
result = await client.call_tool(
"send_message",
{
"project_key": "/tmp/concurrent-test",
"sender_name": sender,
"to": [recipient],
"subject": subject,
"body_md": f"Message from {sender} to {recipient}",
},
)
return result
# Send 10 messages concurrently
async with Client(server) as client:
tasks = []
for i in range(10):
sender = data["agents"][i % 5]
recipient = data["agents"][(i + 1) % 5]
tasks.append(send_message(client, sender, recipient, f"Concurrent Message {i}"))
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successes (some may fail due to auto-registration issues, that's ok)
successes = sum(1 for r in results if not isinstance(r, Exception))
assert successes >= 5, f"Expected at least 5 successful sends, got {successes}"
@pytest.mark.asyncio
async def test_concurrent_messages_to_same_thread(isolated_env):
"""Test multiple agents writing to the same thread concurrently."""
settings = _config.get_settings()
data = await _setup_project_and_agents(settings)
server = build_mcp_server()
first_agent = data["agents"][0]
async def send_to_thread(client: Client, sender: str, message_num: int):
"""Send a message to a shared thread."""
result = await client.call_tool(
"send_message",
{
"project_key": "/tmp/concurrent-test",
"sender_name": sender,
"to": [first_agent],
"subject": f"Thread Message {message_num}",
"body_md": f"Message {message_num} from {sender}",
"thread_id": "shared-thread-1",
},
)
return result
# Send 5 messages to same thread concurrently
async with Client(server) as client:
tasks = [send_to_thread(client, data["agents"][i], i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should succeed or fail gracefully
errors = [r for r in results if isinstance(r, Exception)]
assert len(errors) < 3, f"Too many errors: {errors}"
# =============================================================================
# Concurrent File Reservation Tests
# =============================================================================
@pytest.mark.asyncio
async def test_concurrent_file_reservation_different_paths(isolated_env):
"""Test concurrent file reservations on different paths."""
settings = _config.get_settings()
data = await _setup_project_and_agents(settings)
server = build_mcp_server()
async def reserve_file(client: Client, agent: str, path: str):
"""Reserve a file path."""
result = await client.call_tool(
"file_reservation_paths",
{
"project_key": "/tmp/concurrent-test",
"agent_name": agent,
"paths": [path],
"ttl_seconds": 3600,
"exclusive": True,
"reason": f"Testing by {agent}",
},
)
return result
# Reserve different paths concurrently
async with Client(server) as client:
tasks = [reserve_file(client, data["agents"][i], f"src/module{i}.py") for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should succeed (no conflicts)
successes = sum(1 for r in results if not isinstance(r, Exception))
assert successes == 5, f"Expected all 5 reservations to succeed, got {successes}"
@pytest.mark.asyncio
async def test_concurrent_file_reservation_same_path_conflict(isolated_env):
"""Test concurrent file reservations on the same path detect conflicts."""
settings = _config.get_settings()
data = await _setup_project_and_agents(settings)
server = build_mcp_server()
async def reserve_file(client: Client, agent: str):
"""Reserve the same file path."""
result = await client.call_tool(
"file_reservation_paths",
{
"project_key": "/tmp/concurrent-test",
"agent_name": agent,
"paths": ["shared/config.json"],
"ttl_seconds": 3600,
"exclusive": True,
"reason": f"Testing by {agent}",
},
)
return result
# Try to reserve the same path concurrently
async with Client(server) as client:
tasks = [reserve_file(client, data["agents"][i]) for i in range(3)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# At least one should succeed, others may report conflicts
successes = sum(1 for r in results if not isinstance(r, Exception))
assert successes >= 1, "At least one reservation should succeed"
@pytest.mark.asyncio
async def test_concurrent_file_reservation_overlapping_globs(isolated_env):
"""Test concurrent reservations with overlapping glob patterns."""
settings = _config.get_settings()
data = await _setup_project_and_agents(settings)
server = build_mcp_server()
async with Client(server) as client:
# First agent reserves broad pattern
result1 = await client.call_tool(
"file_reservation_paths",
{
"project_key": "/tmp/concurrent-test",
"agent_name": data["agents"][0],
"paths": ["src/**/*.py"],
"ttl_seconds": 3600,
"exclusive": True,
},
)
# Second agent tries to reserve specific file in same pattern
result2 = await client.call_tool(
"file_reservation_paths",
{
"project_key": "/tmp/concurrent-test",
"agent_name": data["agents"][1],
"paths": ["src/app.py"],
"ttl_seconds": 3600,
"exclusive": True,
},
)
# Second should report conflict with first
# Result format varies, just check it doesn't crash
assert result1 is not None
assert result2 is not None
# =============================================================================
# Concurrent Inbox Fetch Tests
# =============================================================================
@pytest.mark.asyncio
async def test_concurrent_inbox_fetches(isolated_env):
"""Test multiple concurrent inbox fetches."""
settings = _config.get_settings()
data = await _setup_project_and_agents(settings)
# First send some messages
server = build_mcp_server()
async with Client(server) as client:
for i in range(5):
await client.call_tool(
"send_message",
{
"project_key": "/tmp/concurrent-test",
"sender_name": data["agents"][(i + 1) % 5],
"to": [data["agents"][0]],
"subject": f"Test Message {i}",
"body_md": f"Body {i}",
},
)
async def fetch_inbox(c: Client):
"""Fetch inbox for Agent0."""
result = await c.call_tool(
"fetch_inbox",
{
"project_key": "/tmp/concurrent-test",
"agent_name": data["agents"][0],
"limit": 100,
},
)
return result
# Fetch inbox concurrently 10 times
tasks = [fetch_inbox(client) for _ in range(10)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should succeed
successes = sum(1 for r in results if not isinstance(r, Exception))
assert successes == 10, f"All inbox fetches should succeed, got {successes}"
@pytest.mark.asyncio
async def test_concurrent_inbox_fetch_during_message_send(isolated_env):
"""Test inbox fetch while messages are being sent."""
settings = _config.get_settings()
data = await _setup_project_and_agents(settings)
server = build_mcp_server()
async def send_message(client: Client, i: int):
"""Send a message."""
await asyncio.sleep(0.01 * i) # Slight stagger
return await client.call_tool(
"send_message",
{
"project_key": "/tmp/concurrent-test",
"sender_name": data["agents"][1],
"to": [data["agents"][0]],
"subject": f"Concurrent Send {i}",
"body_md": f"Body {i}",
},
)
async def fetch_inbox(client: Client):
"""Fetch inbox."""
await asyncio.sleep(0.05) # Slight delay
return await client.call_tool(
"fetch_inbox",
{
"project_key": "/tmp/concurrent-test",
"agent_name": data["agents"][0],
"limit": 100,
},
)
# Run sends and fetches concurrently
async with Client(server) as client:
send_tasks = [send_message(client, i) for i in range(5)]
fetch_tasks = [fetch_inbox(client) for _ in range(3)]
results = await asyncio.gather(*send_tasks, *fetch_tasks, return_exceptions=True)
# Should not crash
errors = [r for r in results if isinstance(r, Exception)]
assert len(errors) < len(results), "Some operations should succeed"
# =============================================================================
# Lock Race Condition Tests
# =============================================================================
@pytest.mark.asyncio
async def test_concurrent_project_ensure(isolated_env):
"""Test concurrent project ensure calls."""
_config.get_settings() # Ensure settings are loaded
await ensure_schema()
server = build_mcp_server()
async def ensure_project(client: Client, suffix: str):
"""Ensure a project exists."""
return await client.call_tool(
"ensure_project",
{"human_key": f"/tmp/race-test-{suffix}"},
)
# Call ensure_project concurrently for same project
async with Client(server) as client:
tasks = [ensure_project(client, "same") for _ in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should succeed (idempotent operation)
successes = sum(1 for r in results if not isinstance(r, Exception))
assert successes >= 4, f"Most ensures should succeed, got {successes}"
@pytest.mark.asyncio
async def test_concurrent_agent_registration(isolated_env):
"""Test concurrent agent registration."""
_config.get_settings() # Ensure settings are loaded
await ensure_schema()
server = build_mcp_server()
async with Client(server) as client:
# First ensure project
await client.call_tool("ensure_project", {"human_key": "/tmp/reg-test"})
async def register_agent(c: Client, i: int):
"""Register an agent."""
return await c.call_tool(
"register_agent",
{
"project_key": "/tmp/reg-test",
"program": "claude-code",
"model": "opus-4",
"task_description": f"Task {i}",
},
)
# Register multiple agents concurrently
tasks = [register_agent(client, i) for i in range(10)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should succeed with unique names
successes = [r for r in results if not isinstance(r, Exception)]
assert len(successes) >= 8, f"Most registrations should succeed, got {len(successes)}"
# Verify unique names
names = set()
for r in successes:
data = getattr(r, "data", None)
if isinstance(data, dict) and "name" in data:
names.add(data["name"])
# Names should be unique (if we could extract them)
# =============================================================================
# Database Concurrent Access Tests
# =============================================================================
@pytest.mark.asyncio
async def test_concurrent_message_read_write(isolated_env):
"""Test concurrent reads and writes to messages table."""
_config.get_settings() # Ensure settings are loaded
await ensure_schema()
async with get_session() as session:
await session.execute(
text("INSERT INTO projects (slug, human_key, created_at) VALUES (:slug, :hk, datetime('now'))"),
{"slug": "db-concurrent", "hk": "/tmp/db-concurrent"},
)
await session.commit()
row = await session.execute(text("SELECT id FROM projects WHERE slug = :slug"), {"slug": "db-concurrent"})
project_id = row.scalar()
async def write_message(i: int) -> None:
"""Write a message."""
async with get_session() as session:
await session.execute(
text(
"INSERT INTO messages (project_id, subject, body_md, importance, ack_required, sender_id, created_ts) "
"VALUES (:pid, :subj, :body, :imp, :ack, 1, datetime('now'))"
),
{"pid": project_id, "subj": f"Msg {i}", "body": f"Body {i}", "imp": "normal", "ack": 0},
)
await session.commit()
async def read_messages() -> int:
"""Read message count."""
async with get_session() as session:
row = await session.execute(
text("SELECT COUNT(*) FROM messages WHERE project_id = :pid"),
{"pid": project_id},
)
return row.scalar() or 0
# Mix writes and reads
write_tasks = [write_message(i) for i in range(10)]
read_tasks = [read_messages() for _ in range(5)]
results = await asyncio.gather(*write_tasks, *read_tasks, return_exceptions=True)
# Should not crash
errors = [r for r in results if isinstance(r, Exception)]
assert len(errors) == 0, f"No errors expected: {errors}"
# =============================================================================
# Archive Lock Tests
# =============================================================================
@pytest.mark.asyncio
async def test_concurrent_archive_writes(isolated_env):
"""Test concurrent writes to the same archive."""
settings = _config.get_settings()
from mcp_agent_mail.storage import ensure_archive, write_agent_profile
archive = await ensure_archive(settings, "archive-lock-test")
async def write_profile(i: int) -> None:
"""Write an agent profile."""
await write_agent_profile(
archive,
{
"name": f"Agent{i}",
"program": "claude-code",
"model": "opus-4",
"task_description": f"Task {i}",
},
)
# Write profiles concurrently
tasks = [write_profile(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Should handle concurrent access
errors = [r for r in results if isinstance(r, Exception)]
assert len(errors) < 3, f"Most writes should succeed: {errors}"
@pytest.mark.asyncio
async def test_concurrent_message_bundle_writes(isolated_env):
"""Test concurrent message bundle writes to archive."""
settings = _config.get_settings()
from mcp_agent_mail.storage import ensure_archive, write_message_bundle
archive = await ensure_archive(settings, "bundle-lock-test")
async def write_bundle(i: int) -> None:
"""Write a message bundle."""
await write_message_bundle(
archive,
message={"id": i, "subject": f"Subject {i}"},
body_md=f"Body {i}",
sender=f"Sender{i}",
recipients=[f"Recipient{i}"],
)
# Write bundles concurrently
tasks = [write_bundle(i) for i in range(10)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Should handle concurrent access (archive lock)
errors = [r for r in results if isinstance(r, Exception)]
assert len(errors) < 5, f"Most writes should succeed: {errors}"