# -*- coding: utf-8 -*-
"""Location: ./tests/integration/test_concurrency_row_locking.py
Copyright 2025
SPDX-License-Identifier: Apache-2.0
Authors: Mihai Criveti
Concurrency tests for row-level locking implementation.
Tests verify that concurrent operations on tools, gateways, prompts, resources,
A2A agents, and servers handle race conditions correctly using PostgreSQL row-level locking.
This test suite validates:
1. Tool creation with duplicate names (public and team visibility)
2. Tool updates with name conflicts
3. Tool toggle operations under concurrent load
4. Gateway creation with duplicate slugs (public and team visibility)
5. Gateway updates with slug conflicts
6. Prompt creation with duplicate names and concurrent operations
7. Resource creation with duplicate URIs and concurrent operations
8. A2A agent creation with duplicate names and concurrent operations
9. Server creation with duplicate names and concurrent operations
10. Mixed concurrent operations (create, update, toggle, read, delete)
11. High concurrency scenarios with unique entities
12. Row-level locking with skip_locked behavior
13. Mixed visibility operations (public, team, private)
"""
# Standard
import asyncio
import os
import uuid
# Third-Party
from httpx import AsyncClient
import pytest
import pytest_asyncio
# First-Party
from mcpgateway.db import get_db
# Set environment variables for testing
os.environ["MCPGATEWAY_ADMIN_API_ENABLED"] = "true"
os.environ["MCPGATEWAY_UI_ENABLED"] = "true"
os.environ["MCPGATEWAY_A2A_ENABLED"] = "true"
def is_postgresql() -> bool:
"""Check if PostgreSQL is configured via DB env var or DATABASE_URL."""
db_env = os.getenv("DB", "").lower()
database_url = os.getenv("DATABASE_URL", "").lower()
return db_env == "postgres" or "postgresql" in database_url
# Skip condition for PostgreSQL-only tests
SKIP_IF_NOT_POSTGRES = pytest.mark.skipif(not is_postgresql(), reason="Row-level locking only works on PostgreSQL")
def create_test_jwt_token():
"""Create a proper JWT token for testing."""
import datetime
import jwt
expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=60)
payload = {
"sub": "admin@example.com",
"email": "admin@example.com",
"iat": int(datetime.datetime.now(datetime.timezone.utc).timestamp()),
"exp": int(expire.timestamp()),
"iss": "mcpgateway",
"aud": "mcpgateway-api",
"teams": [],
}
return jwt.encode(payload, "my-test-key", algorithm="HS256")
TEST_JWT_TOKEN = create_test_jwt_token()
TEST_AUTH_HEADER = {"Authorization": f"Bearer {TEST_JWT_TOKEN}"}
@pytest_asyncio.fixture
async def client(app_with_temp_db):
"""Create test client with authentication mocked."""
from mcpgateway.auth import get_current_user
from mcpgateway.middleware.rbac import get_current_user_with_permissions
from mcpgateway.utils.verify_credentials import require_admin_auth
from mcpgateway.db import EmailUser
from tests.utils.rbac_mocks import create_mock_email_user, create_mock_user_context
TEST_USER = create_mock_email_user(email="admin@example.com", full_name="Test Admin", is_admin=True, is_active=True)
test_db_dependency = app_with_temp_db.dependency_overrides.get(get_db) or get_db
def get_test_db_session():
if callable(test_db_dependency):
return next(test_db_dependency())
return test_db_dependency
test_db_session = get_test_db_session()
# Create admin user in database for permission checks (if not exists)
from sqlalchemy import select
existing_user = test_db_session.execute(select(EmailUser).where(EmailUser.email == "admin@example.com")).scalar_one_or_none()
if not existing_user:
admin_user = EmailUser(email="admin@example.com", full_name="Test Admin", is_admin=True, is_active=True, password_hash="dummy_hash")
test_db_session.add(admin_user)
test_db_session.commit()
test_user_context = create_mock_user_context(email="admin@example.com", full_name="Test Admin", is_admin=True)
test_user_context["db"] = test_db_session
async def mock_require_admin_auth():
return "admin@example.com"
# Mock only the gateway initialization to prevent actual connection attempts
# but keep the database operations intact
from mcpgateway import admin
original_initialize = admin.gateway_service._initialize_gateway
async def mock_initialize_gateway(*args, **kwargs):
# Return mock data without actually connecting
return ({"capabilities": {}}, [], [], []) # capabilities # tools # resources # prompts
admin.gateway_service._initialize_gateway = mock_initialize_gateway
app_with_temp_db.dependency_overrides[get_current_user] = lambda: TEST_USER
app_with_temp_db.dependency_overrides[get_current_user_with_permissions] = lambda: test_user_context
app_with_temp_db.dependency_overrides[require_admin_auth] = mock_require_admin_auth
from httpx import ASGITransport, AsyncClient
transport = ASGITransport(app=app_with_temp_db)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
# Restore original initialization method
admin.gateway_service._initialize_gateway = original_initialize
app_with_temp_db.dependency_overrides.pop(get_current_user, None)
app_with_temp_db.dependency_overrides.pop(get_current_user_with_permissions, None)
app_with_temp_db.dependency_overrides.pop(require_admin_auth, None)
# -------------------------
# Tool Concurrency Tests
# -------------------------
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_tool_creation_same_name(client: AsyncClient):
"""Test concurrent tool creation with same name prevents duplicates."""
tool_name = f"test-tool-{uuid.uuid4()}"
async def create_tool():
form_data = {"name": tool_name, "url": "http://example.com/tool", "description": "Test tool", "integrationType": "REST", "requestType": "GET", "visibility": "public"}
return await client.post("/admin/tools", data=form_data, headers=TEST_AUTH_HEADER)
# Run 10 concurrent creations with same name
results = await asyncio.gather(*[create_tool() for _ in range(10)], return_exceptions=True)
# Count successful creations (200) and conflicts (409)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
# Exactly one should succeed, rest should be conflicts
assert success_count == 1, f"Expected 1 success, got {success_count}"
assert conflict_count == 9, f"Expected 9 conflicts, got {conflict_count}"
# No 500 errors
assert all(isinstance(r, Exception) or r.status_code in [200, 409] for r in results), "Some requests returned unexpected status codes"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_tool_update_same_name(client: AsyncClient):
"""Test concurrent tool updates to same name prevents duplicates."""
# Create two tools
tool1_name = f"tool-1-{uuid.uuid4()}"
tool2_name = f"tool-2-{uuid.uuid4()}"
tool1_data = {"name": tool1_name, "url": "http://example.com/tool1", "description": "Tool 1", "integrationType": "REST", "requestType": "GET", "visibility": "public"}
tool2_data = {"name": tool2_name, "url": "http://example.com/tool2", "description": "Tool 2", "integrationType": "REST", "requestType": "GET", "visibility": "public"}
resp1 = await client.post("/admin/tools", data=tool1_data, headers=TEST_AUTH_HEADER)
resp2 = await client.post("/admin/tools", data=tool2_data, headers=TEST_AUTH_HEADER)
assert resp1.status_code == 200
assert resp2.status_code == 200
# Get tool IDs by listing tools
list_resp = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
tools = list_resp.json()["data"]
tool1 = next((t for t in tools if t["name"] == tool1_name), None)
tool2 = next((t for t in tools if t["name"] == tool2_name), None)
assert tool1 is not None and tool2 is not None
tool1_id = tool1["id"]
tool2_id = tool2["id"]
target_name = f"target-tool-{uuid.uuid4()}"
async def update_tool(tool_id: str):
update_data = {"name": target_name, "customName": target_name, "url": "http://example.com/updated", "requestType": "GET", "integrationType": "REST", "headers": "{}", "input_schema": "{}"}
return await client.post(f"/admin/tools/{tool_id}/edit", data=update_data, headers=TEST_AUTH_HEADER)
# Try to update both tools to same name concurrently
results = await asyncio.gather(*[update_tool(tool1_id), update_tool(tool2_id)], return_exceptions=True)
# With row locking, one should succeed and one should fail with conflict
# However, if both acquire locks before either commits, both may succeed
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code in [200, 303])
_conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409) # noqa: F841
# At least one should succeed
assert success_count >= 1, f"Expected at least 1 success, got {success_count}. Results: {[(r.status_code if not isinstance(r, Exception) else str(r)) for r in results]}"
# If both succeeded, verify no data corruption occurred
if success_count == 2:
# Both updates succeeded - verify final state is consistent
final_list = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert final_list.status_code == 200
final_tools = final_list.json()["data"]
# Both tools should now have the target name (last write wins)
tools_with_target_name = [t for t in final_tools if t["name"] == target_name or t.get("customName") == target_name]
assert len(tools_with_target_name) >= 1, "At least one tool should have the target name"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_tool_toggle(client: AsyncClient):
"""Test concurrent enable/disable doesn't cause race condition."""
# Create a tool
tool_name = f"toggle-tool-{uuid.uuid4()}"
tool_data = {"name": tool_name, "url": "http://example.com/tool", "description": "Toggle test tool", "integrationType": "REST", "requestType": "GET", "visibility": "public"}
resp = await client.post("/admin/tools", data=tool_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200
# Get tool ID by listing tools
list_resp = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
tools = list_resp.json()["data"]
tool = next((t for t in tools if t["name"] == tool_name), None)
assert tool is not None
tool_id = tool["id"]
async def toggle():
return await client.post(f"/admin/tools/{tool_id}/toggle", data={}, headers=TEST_AUTH_HEADER)
# Run 20 concurrent toggles
results = await asyncio.gather(*[toggle() for _ in range(20)], return_exceptions=True)
# All should succeed or fail cleanly (no 500 errors)
assert all(isinstance(r, Exception) or r.status_code in [200, 303, 404, 409] for r in results), "Some requests returned unexpected status codes"
# Verify final state is consistent by listing tools
list_resp = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
tools = list_resp.json()["data"]
final_tool = next((t for t in tools if t["id"] == tool_id), None)
assert final_tool is not None
# -------------------------
# Gateway Concurrency Tests
# -------------------------
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_gateway_creation_same_slug(client: AsyncClient):
"""Test concurrent gateway creation with same slug prevents duplicates."""
gateway_name = f"Test Gateway {uuid.uuid4()}"
async def create_gateway():
gateway_data = {"name": gateway_name, "url": "http://example.com/gateway", "description": "Test gateway", "visibility": "public", "transport": "SSE"}
return await client.post("/admin/gateways", data=gateway_data, headers=TEST_AUTH_HEADER)
# Run 10 concurrent creations with same name (will generate same slug)
results = await asyncio.gather(*[create_gateway() for _ in range(10)], return_exceptions=True)
# Count successful creations and conflicts
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
validation_error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 422)
error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code >= 500)
# Exactly one should succeed, rest should be conflicts or validation errors
assert success_count == 1, f"Expected 1 success, got {success_count}. Status codes: {[r.status_code for r in results if not isinstance(r, Exception)]}"
# At least 7 should be conflicts or validation errors (allowing for some timing variations)
assert (
conflict_count + validation_error_count
) >= 7, f"Expected at least 7 conflicts/validation errors, got {conflict_count} conflicts + {validation_error_count} validation errors = {conflict_count + validation_error_count}"
assert error_count == 0, f"Expected no 500 errors, got {error_count}"
# All non-exception responses should be either success, conflict, or validation error
assert all(
isinstance(r, Exception) or r.status_code in [200, 409, 422] for r in results
), f"Some requests returned unexpected status codes: {[r.status_code for r in results if not isinstance(r, Exception)]}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_gateway_update_same_slug(client: AsyncClient):
"""Test concurrent gateway updates to same slug prevents duplicates."""
# Create two gateways
gateway1_name = f"Gateway 1 {uuid.uuid4()}"
gateway2_name = f"Gateway 2 {uuid.uuid4()}"
gateway1_data = {"name": gateway1_name, "url": "http://example.com/gateway1", "description": "Gateway 1", "visibility": "public", "transport": "SSE"}
gateway2_data = {"name": gateway2_name, "url": "http://example.com/gateway2", "description": "Gateway 2", "visibility": "public", "transport": "SSE"}
resp1 = await client.post("/admin/gateways", data=gateway1_data, headers=TEST_AUTH_HEADER)
resp2 = await client.post("/admin/gateways", data=gateway2_data, headers=TEST_AUTH_HEADER)
assert resp1.status_code == 200
assert resp2.status_code == 200
# Get gateway IDs by listing gateways
list_resp = await client.get("/admin/gateways", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
gateways = list_resp.json()["data"]
gateway1 = next((g for g in gateways if g["name"] == gateway1_name), None)
gateway2 = next((g for g in gateways if g["name"] == gateway2_name), None)
assert gateway1 is not None and gateway2 is not None
gateway1_id = gateway1["id"]
gateway2_id = gateway2["id"]
target_name = f"Target Gateway {uuid.uuid4()}"
async def update_gateway(gateway_id: str):
update_data = {"name": target_name, "url": "http://example.com/updated"}
return await client.post(f"/admin/gateways/{gateway_id}/edit", data=update_data, headers=TEST_AUTH_HEADER)
# Try to update both gateways to same name concurrently
results = await asyncio.gather(*[update_gateway(gateway1_id), update_gateway(gateway2_id)], return_exceptions=True)
# Count results - in concurrent updates, we expect at least one success
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code in [200, 303])
_conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409) # noqa: F841
_error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code >= 500) # noqa: F841
# At least one should succeed
assert success_count >= 1, f"Expected at least 1 success, got {success_count}"
# Note: Gateway updates may encounter errors during concurrent operations
# This is acceptable as long as at least one update succeeds
# The test verifies that concurrent operations don't cause data corruption
# -------------------------
# Mixed Concurrency Tests
# -------------------------
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_mixed_operations(client: AsyncClient):
"""Test mixed concurrent operations (create, update, toggle) work correctly."""
# Create initial tool
tool_name = f"mixed-tool-{uuid.uuid4()}"
tool_data = {"name": tool_name, "url": "http://example.com/tool", "description": "Mixed test tool", "integrationType": "REST", "requestType": "GET", "visibility": "public"}
resp = await client.post("/admin/tools", data=tool_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200
# Get tool ID by listing tools
list_resp = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
tools = list_resp.json()["data"]
tool = next((t for t in tools if t["name"] == tool_name), None)
assert tool is not None
tool_id = tool["id"]
async def update_tool():
update_data = {
"name": tool_name,
"customName": tool_name,
"url": "http://example.com/tool",
"description": f"Updated at {uuid.uuid4()}",
"requestType": "GET",
"integrationType": "REST",
"headers": "{}",
"input_schema": "{}",
}
return await client.post(f"/admin/tools/{tool_id}/edit", data=update_data, headers=TEST_AUTH_HEADER)
async def toggle_tool():
return await client.post(f"/admin/tools/{tool_id}/toggle", data={}, headers=TEST_AUTH_HEADER)
async def read_tool():
return await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
# Mix of operations
operations = [update_tool() for _ in range(5)] + [toggle_tool() for _ in range(5)] + [read_tool() for _ in range(5)]
results = await asyncio.gather(*operations, return_exceptions=True)
# All should complete without 500 errors
assert all(isinstance(r, Exception) or r.status_code in [200, 303, 404, 409] for r in results), "Some requests returned unexpected status codes"
# Verify final state is consistent
final_resp = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert final_resp.status_code == 200
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_high_concurrency_tool_creation(client: AsyncClient):
"""Test high concurrency with many unique tool creations."""
async def create_unique_tool(index: int):
tool_data = {
"name": f"concurrent-tool-{index}-{uuid.uuid4()}",
"url": f"http://example.com/tool{index}",
"description": f"Concurrent test tool {index}",
"integrationType": "REST",
"requestType": "GET",
"visibility": "public",
}
return await client.post("/admin/tools", data=tool_data, headers=TEST_AUTH_HEADER)
# Create 50 tools concurrently
results = await asyncio.gather(*[create_unique_tool(i) for i in range(50)], return_exceptions=True)
# All should succeed (different names)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
failed_count = sum(1 for r in results if isinstance(r, Exception) or (hasattr(r, "status_code") and r.status_code != 200))
error_500_count = sum(1 for r in results if not isinstance(r, Exception) and hasattr(r, "status_code") and r.status_code >= 500)
# Allow for occasional failures due to connection pool exhaustion under extreme concurrency
# At least 48 out of 50 should succeed (96% success rate)
assert success_count >= 48, f"Expected at least 48 successes, got {success_count} (failed: {failed_count})"
# No 500 errors - those indicate server bugs, not resource exhaustion
assert error_500_count == 0, f"Got {error_500_count} server errors (500+), which indicates bugs not resource limits"
# -------------------------
# Team-Scoped Tool Tests
# -------------------------
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_team_tool_creation_same_name(client: AsyncClient):
"""Test concurrent team tool creation with same name prevents duplicates within team."""
tool_name = f"team-tool-{uuid.uuid4()}"
async def create_team_tool():
form_data = {"name": tool_name, "url": "http://example.com/tool", "description": "Team test tool", "integrationType": "REST", "requestType": "GET", "visibility": "team"}
return await client.post("/admin/tools", data=form_data, headers=TEST_AUTH_HEADER)
# Run 10 concurrent creations with same name for team visibility
results = await asyncio.gather(*[create_team_tool() for _ in range(10)], return_exceptions=True)
# Count successful creations (200) and conflicts (409)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
# Exactly one should succeed, rest should be conflicts
assert success_count == 1, f"Expected 1 success, got {success_count}"
assert conflict_count == 9, f"Expected 9 conflicts, got {conflict_count}"
# No 500 errors
assert all(isinstance(r, Exception) or r.status_code in [200, 409] for r in results), "Some requests returned unexpected status codes"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_tool_update_same_tool(client: AsyncClient):
"""Test concurrent tool updates on the same tool are serialized by row locking."""
# Create a tool with public visibility to ensure permissions work
tool_name = f"update-test-tool-{uuid.uuid4()}"
tool_data = {"name": tool_name, "url": "http://example.com/tool", "description": "Update test tool", "integrationType": "REST", "requestType": "GET", "visibility": "public"}
resp = await client.post("/admin/tools", data=tool_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200
# Get tool ID
list_resp = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
tools = list_resp.json()["data"]
tool = next((t for t in tools if t["name"] == tool_name), None)
assert tool is not None
tool_id = tool["id"]
async def update_description():
"""Update description - row locking ensures updates are serialized."""
update_data = {
"name": tool_name,
"customName": tool_name,
"url": "http://example.com/tool",
"description": f"Updated at {uuid.uuid4()}",
"requestType": "GET",
"integrationType": "REST",
"headers": "{}",
"input_schema": "{}",
}
return await client.post(f"/admin/tools/{tool_id}/edit", data=update_data, headers=TEST_AUTH_HEADER)
# Run concurrent updates on the same tool
results = await asyncio.gather(*[update_description() for _ in range(5)], return_exceptions=True)
# All updates should succeed (row locking serializes them, no conflicts on description changes)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code in [200, 303])
# All should succeed since we're just updating descriptions
assert success_count == 5, f"Expected 5 successes, got {success_count}. Results: {[(r.status_code if not isinstance(r, Exception) else str(r)) for r in results]}"
# No errors
assert all(
isinstance(r, Exception) or r.status_code in [200, 303] for r in results
), f"Some requests returned unexpected status codes: {[(r.status_code if not isinstance(r, Exception) else str(r)) for r in results]}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_tool_delete_operations(client: AsyncClient):
"""Test concurrent delete operations with atomic DELETE ... RETURNING.
With the atomic DELETE ... RETURNING implementation, exactly one delete should
succeed. All other concurrent deletes will find no row to delete and should
return an error in the redirect URL.
Note: The admin endpoint always returns 303 redirects, so we check the
redirect URL for error messages to distinguish success from failure.
"""
# Create a tool
tool_name = f"delete-tool-{uuid.uuid4()}"
tool_data = {"name": tool_name, "url": "http://example.com/tool", "description": "Delete test tool", "integrationType": "REST", "requestType": "GET", "visibility": "public"}
resp = await client.post("/admin/tools", data=tool_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200
# Get tool ID
list_resp = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
tools = list_resp.json()["data"]
tool = next((t for t in tools if t["name"] == tool_name), None)
assert tool is not None
tool_id = tool["id"]
async def delete_tool():
return await client.post(f"/admin/tools/{tool_id}/delete", data={}, headers=TEST_AUTH_HEADER, follow_redirects=False)
# Run concurrent deletes
results = await asyncio.gather(*[delete_tool() for _ in range(5)], return_exceptions=True)
# All should return 303 redirects (admin endpoint always redirects)
assert all(
not isinstance(r, Exception) and r.status_code == 303 for r in results
), f"Expected all 303 redirects, got: {[(r.status_code if not isinstance(r, Exception) else str(r)) for r in results]}"
# Count successes (no error in redirect URL) vs failures (error in redirect URL)
success_count = sum(1 for r in results if not isinstance(r, Exception) and "error=" not in r.headers.get("location", ""))
error_count = sum(1 for r in results if not isinstance(r, Exception) and "error=" in r.headers.get("location", ""))
# With atomic DELETE ... RETURNING, exactly one should succeed
assert success_count == 1, f"Expected exactly 1 success, got {success_count}. Redirect URLs: {[r.headers.get('location', '') for r in results if not isinstance(r, Exception)]}"
# The rest should have errors (tool not found)
assert error_count == 4, f"Expected 4 errors, got {error_count}. Redirect URLs: {[r.headers.get('location', '') for r in results if not isinstance(r, Exception)]}"
# Verify tool is actually deleted
final_list = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
assert final_list.status_code == 200
final_tools = final_list.json()["data"]
assert not any(t["id"] == tool_id for t in final_tools), "Tool should be deleted"
# -------------------------
# Gateway Row Locking Tests
# -------------------------
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_gateway_toggle(client: AsyncClient):
"""Test concurrent gateway enable/disable doesn't cause race condition."""
# Create a gateway with unique URL to avoid conflicts
unique_id = uuid.uuid4()
gateway_name = f"Toggle Gateway {unique_id}"
gateway_data = {"name": gateway_name, "url": f"http://example.com/gateway-{unique_id}", "description": "Toggle test gateway", "visibility": "public", "transport": "SSE"}
resp = await client.post("/admin/gateways", data=gateway_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200, f"Gateway creation failed with status {resp.status_code}: {resp.text}"
# Get gateway ID
list_resp = await client.get("/admin/gateways", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
gateways = list_resp.json()["data"]
gateway = next((g for g in gateways if g["name"] == gateway_name), None)
assert gateway is not None
gateway_id = gateway["id"]
async def toggle():
return await client.post(f"/admin/gateways/{gateway_id}/toggle", data={}, headers=TEST_AUTH_HEADER)
# Run 20 concurrent toggles
results = await asyncio.gather(*[toggle() for _ in range(20)], return_exceptions=True)
# All should complete without 500 errors
assert all(isinstance(r, Exception) or r.status_code in [200, 303, 404, 409] for r in results), "Some requests returned unexpected status codes"
# Verify final state is consistent
final_resp = await client.get("/admin/gateways", headers=TEST_AUTH_HEADER)
assert final_resp.status_code == 200
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_team_gateway_creation_same_slug(client: AsyncClient):
"""Test concurrent team gateway creation with same slug prevents duplicates within team."""
gateway_name = f"Team Gateway {uuid.uuid4()}"
async def create_team_gateway():
gateway_data = {"name": gateway_name, "url": "http://example.com/gateway", "description": "Team test gateway", "visibility": "team", "transport": "SSE"}
return await client.post("/admin/gateways", data=gateway_data, headers=TEST_AUTH_HEADER)
# Run 10 concurrent creations with same name (will generate same slug)
results = await asyncio.gather(*[create_team_gateway() for _ in range(10)], return_exceptions=True)
# Count successful creations and conflicts
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code >= 500)
# Exactly one should succeed, rest should be conflicts
assert success_count == 1, f"Expected 1 success, got {success_count}"
assert conflict_count >= 8, f"Expected at least 8 conflicts, got {conflict_count}"
assert error_count == 0, f"Expected no 500 errors, got {error_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_gateway_delete_operations(client: AsyncClient):
"""Test concurrent delete operations with atomic DELETE ... RETURNING.
With the atomic DELETE ... RETURNING implementation, exactly one delete should
succeed. All other concurrent deletes will find no row to delete and should
return an error in the redirect URL.
Note: The admin endpoint always returns 303 redirects, so we check the
redirect URL for error messages to distinguish success from failure.
"""
# Create a gateway with unique URL to avoid conflicts
unique_id = uuid.uuid4()
gateway_name = f"Delete Gateway {unique_id}"
gateway_data = {"name": gateway_name, "url": f"http://example.com/gateway-{unique_id}", "description": "Delete test gateway", "visibility": "public", "transport": "SSE"}
resp = await client.post("/admin/gateways", data=gateway_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200, f"Gateway creation failed with status {resp.status_code}: {resp.text}"
# Get gateway ID
list_resp = await client.get("/admin/gateways", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
gateways = list_resp.json()["data"]
gateway = next((g for g in gateways if g["name"] == gateway_name), None)
assert gateway is not None, f"Gateway not found in list. Available gateways: {[g['name'] for g in gateways]}"
gateway_id = gateway["id"]
async def delete_gateway():
return await client.post(f"/admin/gateways/{gateway_id}/delete", data={}, headers=TEST_AUTH_HEADER, follow_redirects=False)
# Run concurrent deletes
results = await asyncio.gather(*[delete_gateway() for _ in range(5)], return_exceptions=True)
# All should return 303 redirects (admin endpoint always redirects)
assert all(
not isinstance(r, Exception) and r.status_code == 303 for r in results
), f"Expected all 303 redirects, got: {[(r.status_code if not isinstance(r, Exception) else str(r)) for r in results]}"
# Count successes (no error in redirect URL) vs failures (error in redirect URL)
success_count = sum(1 for r in results if not isinstance(r, Exception) and "error=" not in r.headers.get("location", ""))
error_count = sum(1 for r in results if not isinstance(r, Exception) and "error=" in r.headers.get("location", ""))
# With atomic DELETE ... RETURNING, at least one should succeed
# Note: In some cases, gateway deletion might fail due to initialization issues
# so we check that at least some deletes work correctly
assert success_count >= 1, f"Expected at least 1 success, got {success_count}. Redirect URLs: {[r.headers.get('location', '') for r in results if not isinstance(r, Exception)]}"
# If one succeeded, the rest should have errors (gateway not found)
if success_count == 1:
assert error_count == 4, f"Expected 4 errors when 1 succeeds, got {error_count}. Redirect URLs: {[r.headers.get('location', '') for r in results if not isinstance(r, Exception)]}"
# Verify gateway is actually deleted
final_list = await client.get("/admin/gateways", headers=TEST_AUTH_HEADER)
assert final_list.status_code == 200
final_gateways = final_list.json()["data"]
assert not any(g["id"] == gateway_id for g in final_gateways), "Gateway should be deleted"
# -------------------------
# Skip-Locked Behavior Tests
# -------------------------
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_skip_locked_behavior_tool_updates(client: AsyncClient):
"""Test that skip_locked allows concurrent operations to proceed without blocking."""
# Create multiple tools
tool_ids = []
for i in range(5):
tool_name = f"skip-lock-tool-{i}-{uuid.uuid4()}"
tool_data = {
"name": tool_name,
"url": f"http://example.com/tool{i}",
"description": f"Skip lock test tool {i}",
"integrationType": "REST",
"requestType": "GET",
"visibility": "public",
"headers": "{}",
"input_schema": "{}",
}
resp = await client.post("/admin/tools", data=tool_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200, f"Failed to create tool {i}: {resp.status_code} - {resp.text[:200]}"
# Get tool ID
list_resp = await client.get("/admin/tools", headers=TEST_AUTH_HEADER)
tools = list_resp.json()["data"]
tool = next((t for t in tools if t["name"] == tool_name), None)
if tool:
tool_ids.append(tool["id"])
async def update_tool(tool_id: str, index: int):
tool_name = f"updated-tool-{index}-{uuid.uuid4()}"
update_data = {
"name": tool_name,
"customName": tool_name,
"url": f"http://example.com/updated{index}",
"requestType": "GET",
"integrationType": "REST",
"headers": "{}",
"input_schema": "{}",
"description": f"Updated description {index}",
}
return await client.post(f"/admin/tools/{tool_id}/edit", data=update_data, headers=TEST_AUTH_HEADER)
# Update all tools concurrently
results = await asyncio.gather(*[update_tool(tool_id, i) for i, tool_id in enumerate(tool_ids)], return_exceptions=True)
# Debug: Print all responses
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f"Tool {i} - Exception: {r}")
else:
print(f"Tool {i} - Status: {r.status_code}, Body: {r.text[:200] if hasattr(r, 'text') else 'N/A'}")
# All should succeed (different tools, skip_locked allows parallel processing)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code in [200, 303])
assert success_count == len(tool_ids), f"Expected {len(tool_ids)} successes, got {success_count}. Results: {[(r.status_code if not isinstance(r, Exception) else str(r)) for r in results]}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_mixed_visibility_concurrent_operations(client: AsyncClient):
"""Test concurrent operations with same name - DB constraint enforces uniqueness by team_id+owner_email+name."""
base_uuid = uuid.uuid4()
async def create_public_tool():
# Use unique name per visibility to avoid constraint violation
form_data = {
"name": f"mixed-vis-public-{base_uuid}",
"url": "http://example.com/public",
"description": "Public tool",
"integrationType": "REST",
"requestType": "GET",
"visibility": "public",
"headers": "{}",
"input_schema": "{}",
}
return await client.post("/admin/tools", data=form_data, headers=TEST_AUTH_HEADER)
async def create_team_tool():
form_data = {
"name": f"mixed-vis-team-{base_uuid}",
"url": "http://example.com/team",
"description": "Team tool",
"integrationType": "REST",
"requestType": "GET",
"visibility": "team",
"headers": "{}",
"input_schema": "{}",
}
return await client.post("/admin/tools", data=form_data, headers=TEST_AUTH_HEADER)
async def create_private_tool():
form_data = {
"name": f"mixed-vis-private-{base_uuid}",
"url": "http://example.com/private",
"description": "Private tool",
"integrationType": "REST",
"requestType": "GET",
"visibility": "private",
"headers": "{}",
"input_schema": "{}",
}
return await client.post("/admin/tools", data=form_data, headers=TEST_AUTH_HEADER)
# Create tools with different names and visibility concurrently
# Each visibility type has 3 concurrent requests with the same name
results = await asyncio.gather(*[create_public_tool() for _ in range(3)], *[create_team_tool() for _ in range(3)], *[create_private_tool() for _ in range(3)], return_exceptions=True)
# Should have one success per visibility type (3 total) due to DB constraint on team_id+owner_email+name
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
# Expect 3 successes (one per unique name) and 6 conflicts (2 per name)
assert success_count == 3, f"Expected 3 successes (one per unique name), got {success_count}"
assert conflict_count == 6, f"Expected 6 conflicts, got {conflict_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_high_concurrency_gateway_creation(client: AsyncClient):
"""Test high concurrency with many unique gateway creations."""
async def create_unique_gateway(index: int):
gateway_data = {
"name": f"Concurrent Gateway {index} {uuid.uuid4()}",
"url": f"http://example{index}.com/gateway",
"description": f"Concurrent test gateway {index}",
"visibility": "public",
"transport": "SSE",
}
return await client.post("/admin/gateways", data=gateway_data, headers=TEST_AUTH_HEADER)
# Create 30 gateways concurrently
results = await asyncio.gather(*[create_unique_gateway(i) for i in range(30)], return_exceptions=True)
# All should succeed (different names/slugs)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
failed_count = sum(1 for r in results if isinstance(r, Exception) or (hasattr(r, "status_code") and r.status_code != 200))
error_500_count = sum(1 for r in results if not isinstance(r, Exception) and hasattr(r, "status_code") and r.status_code >= 500)
# Allow for occasional failures due to connection pool exhaustion under extreme concurrency
# At least 28 out of 30 should succeed (93% success rate)
assert success_count >= 28, f"Expected at least 28 successes, got {success_count} (failed: {failed_count})"
# No 500 errors - those indicate server bugs, not resource exhaustion
assert error_500_count == 0, f"Got {error_500_count} server errors (500+), which indicates bugs not resource limits"
# ============================================================================
# PROMPT CONCURRENCY TESTS
# ============================================================================
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_prompt_creation_same_name(client: AsyncClient):
"""Test concurrent prompt creation with same name prevents duplicates."""
prompt_name = f"test-prompt-{uuid.uuid4()}"
async def create_prompt():
form_data = {"name": prompt_name, "description": "Test prompt", "template": "Test template", "arguments": "[]", "visibility": "public"}
return await client.post("/admin/prompts", data=form_data, headers=TEST_AUTH_HEADER)
# Run 10 concurrent creations with same name
results = await asyncio.gather(*[create_prompt() for _ in range(10)], return_exceptions=True)
# Count successful creations (200) and conflicts (409)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
# Exactly one should succeed, rest should be conflicts
assert success_count == 1, f"Expected 1 success, got {success_count}"
assert conflict_count == 9, f"Expected 9 conflicts, got {conflict_count}"
# No 500 errors
assert all(isinstance(r, Exception) or r.status_code in [200, 409] for r in results), "Some requests returned unexpected status codes"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_prompt_update_same_name(client: AsyncClient):
"""Test concurrent prompt updates to same name prevents duplicates."""
# Create two prompts
prompt1_name = f"prompt-1-{uuid.uuid4()}"
prompt2_name = f"prompt-2-{uuid.uuid4()}"
prompt1_data = {"name": prompt1_name, "description": "Prompt 1", "template": "Template 1", "arguments": "[]", "visibility": "public"}
prompt2_data = {"name": prompt2_name, "description": "Prompt 2", "template": "Template 2", "arguments": "[]", "visibility": "public"}
resp1 = await client.post("/admin/prompts", data=prompt1_data, headers=TEST_AUTH_HEADER)
resp2 = await client.post("/admin/prompts", data=prompt2_data, headers=TEST_AUTH_HEADER)
assert resp1.status_code == 200
assert resp2.status_code == 200
# Get prompt IDs
list_resp = await client.get("/prompts", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
prompts = list_resp.json() # Returns list directly, not {"data": [...]}
prompt1 = next((p for p in prompts if p["name"] == prompt1_name), None)
prompt2 = next((p for p in prompts if p["name"] == prompt2_name), None)
assert prompt1 is not None and prompt2 is not None
prompt1_id = prompt1["id"]
prompt2_id = prompt2["id"]
target_name = f"target-prompt-{uuid.uuid4()}"
async def update_prompt(prompt_id: str):
update_data = {"name": target_name, "description": "Updated prompt", "template": "Updated template", "arguments": "[]"}
return await client.post(f"/admin/prompts/{prompt_id}/edit", data=update_data, headers=TEST_AUTH_HEADER)
# Try to update both prompts to same name concurrently
results = await asyncio.gather(*[update_prompt(prompt1_id), update_prompt(prompt2_id)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
_conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409) # noqa: F841
# At least one should succeed
assert success_count >= 1, f"Expected at least 1 success, got {success_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_prompt_toggle(client: AsyncClient):
"""Test concurrent enable/disable doesn't cause race condition."""
prompt_name = f"toggle-prompt-{uuid.uuid4()}"
prompt_data = {"name": prompt_name, "description": "Toggle test prompt", "template": "Toggle template", "arguments": "[]", "visibility": "public"}
resp = await client.post("/admin/prompts", data=prompt_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200
# Get prompt ID
list_resp = await client.get("/prompts", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
prompts = list_resp.json() # Returns list directly, not {"data": [...]}
prompt = next((p for p in prompts if p["name"] == prompt_name), None)
assert prompt is not None
prompt_id = prompt["id"]
async def toggle():
return await client.post(f"/admin/prompts/{prompt_id}/toggle", data={}, headers=TEST_AUTH_HEADER)
# Run 20 concurrent toggles
results = await asyncio.gather(*[toggle() for _ in range(20)], return_exceptions=True)
# All should succeed or fail cleanly (no 500 errors)
assert all(isinstance(r, Exception) or r.status_code in [200, 303, 404, 409] for r in results), "Some requests returned unexpected status codes"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_prompt_delete_operations(client: AsyncClient):
"""Test concurrent delete operations handle race conditions correctly."""
prompt_name = f"delete-prompt-{uuid.uuid4()}"
prompt_data = {"name": prompt_name, "description": "Delete test prompt", "template": "Delete template", "arguments": "[]", "visibility": "public"}
resp = await client.post("/admin/prompts", data=prompt_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200
# Get prompt ID
list_resp = await client.get("/prompts", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
prompts = list_resp.json() # Returns list directly, not {"data": [...]}
prompt = next((p for p in prompts if p["name"] == prompt_name), None)
assert prompt is not None
prompt_id = prompt["id"]
async def delete_prompt():
return await client.post(f"/admin/prompts/{prompt_id}/delete", data={}, headers=TEST_AUTH_HEADER)
# Run 10 concurrent deletes
results = await asyncio.gather(*[delete_prompt() for _ in range(10)], return_exceptions=True)
# Admin delete always returns 303, but with error in URL if failed
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 303 and "error=" not in r.headers.get("location", ""))
error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 303 and "error=" in r.headers.get("location", ""))
# Exactly one should succeed (admin delete returns 303 redirect without error)
assert success_count == 1, f"Expected 1 successful delete, got {success_count}"
assert error_count == 9, f"Expected 9 failed deletes with error, got {error_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_high_concurrency_prompt_creation(client: AsyncClient):
"""Test high concurrency with unique prompts."""
async def create_unique_prompt(index: int):
prompt_data = {"name": f"prompt-{uuid.uuid4()}-{index}", "description": f"Prompt {index}", "template": f"Template {index}", "arguments": "[]", "visibility": "public"}
return await client.post("/admin/prompts", data=prompt_data, headers=TEST_AUTH_HEADER)
# Create 50 unique prompts concurrently
results = await asyncio.gather(*[create_unique_prompt(i) for i in range(50)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
failed_count = sum(1 for r in results if isinstance(r, Exception) or (hasattr(r, "status_code") and r.status_code != 200))
# Allow for occasional failures due to connection pool exhaustion under extreme concurrency
# At least 48 out of 50 should succeed (96% success rate)
assert success_count >= 48, f"Expected at least 48 successful creations, got {success_count} (failed: {failed_count})"
# ============================================================================
# RESOURCE CONCURRENCY TESTS
# ============================================================================
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_resource_update_same_uri(client: AsyncClient):
"""Test concurrent resource updates to same URI prevents duplicates."""
# Create two resources
resource1_uri = f"file:///resource-1-{uuid.uuid4()}.txt"
resource2_uri = f"file:///resource-2-{uuid.uuid4()}.txt"
resource1_data = {
"resource": {"uri": resource1_uri, "name": "Resource 1", "description": "Resource 1", "mimeType": "text/plain", "content": "Resource 1 content"},
"team_id": None,
"visibility": "public",
}
resource2_data = {
"resource": {"uri": resource2_uri, "name": "Resource 2", "description": "Resource 2", "mimeType": "text/plain", "content": "Resource 2 content"},
"team_id": None,
"visibility": "public",
}
resp1 = await client.post("/resources", json=resource1_data, headers=TEST_AUTH_HEADER)
resp2 = await client.post("/resources", json=resource2_data, headers=TEST_AUTH_HEADER)
assert resp1.status_code == 200, f"Failed to create resource 1: {resp1.status_code} - {resp1.text}"
assert resp2.status_code == 200, f"Failed to create resource 2: {resp2.status_code} - {resp2.text}"
# Get resource IDs
list_resp = await client.get("/resources", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
resources = list_resp.json() # Returns list directly, not wrapped in {"data": [...]}
resource1 = next((r for r in resources if r["uri"] == resource1_uri), None)
resource2 = next((r for r in resources if r["uri"] == resource2_uri), None)
assert resource1 is not None, f"Resource 1 not found in list: {[r['uri'] for r in resources]}"
assert resource2 is not None, f"Resource 2 not found in list: {[r['uri'] for r in resources]}"
resource1_id = resource1["id"]
resource2_id = resource2["id"]
target_uri = f"file:///target-resource-{uuid.uuid4()}.txt"
async def update_resource(resource_id: str):
update_data = {"uri": target_uri, "name": "Updated Resource", "description": "Updated resource", "mimeType": "text/plain"}
return await client.put(f"/resources/{resource_id}", json=update_data, headers=TEST_AUTH_HEADER)
# Try to update both resources to same URI concurrently
results = await asyncio.gather(*[update_resource(resource1_id), update_resource(resource2_id)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
# Exactly one should succeed, one should conflict
assert success_count == 1, f"Expected 1 success, got {success_count}"
assert conflict_count == 1, f"Expected 1 conflict, got {conflict_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_resource_toggle(client: AsyncClient):
"""Test concurrent enable/disable doesn't cause race condition."""
resource_uri = f"file:///toggle-resource-{uuid.uuid4()}.txt"
resource_data = {
"resource": {"uri": resource_uri, "name": "Toggle Resource", "description": "Toggle test resource", "mimeType": "text/plain", "content": "Toggle resource content"},
"team_id": None,
"visibility": "public",
}
resp = await client.post("/resources", json=resource_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200, f"Failed to create resource: {resp.status_code} - {resp.text}"
# Get resource ID
list_resp = await client.get("/resources", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
resources = list_resp.json() # Returns list directly
resource = next((r for r in resources if r["uri"] == resource_uri), None)
assert resource is not None, f"Resource not found in list: {[r['uri'] for r in resources]}"
resource_id = resource["id"]
async def toggle():
return await client.post(f"/resources/{resource_id}/toggle", json={}, headers=TEST_AUTH_HEADER)
# Run 20 concurrent toggles
results = await asyncio.gather(*[toggle() for _ in range(20)], return_exceptions=True)
# All should succeed or fail cleanly (no 500 errors)
assert all(isinstance(r, Exception) or r.status_code in [200, 303, 404, 409] for r in results), "Some requests returned unexpected status codes"
# Verify final state is consistent
final_resp = await client.get("/resources", headers=TEST_AUTH_HEADER)
assert final_resp.status_code == 200
final_resources = final_resp.json() # Returns list directly
final_resource = next((r for r in final_resources if r["uri"] == resource_uri), None)
assert final_resource is not None, f"Resource not found after toggles: {[r['uri'] for r in final_resources]}"
# Resource should be in a valid state (either enabled or disabled)
assert isinstance(final_resource["enabled"], bool)
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_skip_locked_behavior_resource_updates(client: AsyncClient):
"""Test that skip_locked allows concurrent resource operations to proceed without blocking."""
# Create multiple resources
resource_ids = []
for i in range(5):
resource_uri = f"file:///skip-lock-resource-{i}-{uuid.uuid4()}.txt"
resource_data = {
"resource": {"uri": resource_uri, "name": f"Skip lock resource {i}", "description": f"Skip lock test resource {i}", "mimeType": "text/plain", "content": f"Skip lock resource {i} content"},
"team_id": None,
"visibility": "public",
}
resp = await client.post("/resources", json=resource_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200, f"Failed to create resource {i}: {resp.status_code} - {resp.text}"
# Get resource ID
list_resp = await client.get("/resources", headers=TEST_AUTH_HEADER)
resources = list_resp.json() # Returns list directly
resource = next((r for r in resources if r["uri"] == resource_uri), None)
if resource:
resource_ids.append(resource["id"])
async def update_resource(resource_id: str, index: int):
resource_uri = f"file:///updated-resource-{index}-{uuid.uuid4()}.txt"
update_data = {"uri": resource_uri, "name": f"Updated resource {index}", "description": f"Updated description {index}", "mimeType": "text/plain"}
return await client.put(f"/resources/{resource_id}", json=update_data, headers=TEST_AUTH_HEADER)
# Update all resources concurrently
results = await asyncio.gather(*[update_resource(resource_id, i) for i, resource_id in enumerate(resource_ids)], return_exceptions=True)
# All should succeed (different resources, skip_locked allows parallel processing)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
assert success_count == len(resource_ids), f"Expected {len(resource_ids)} successes, got {success_count}"
# ============================================================================
# A2A AGENT CONCURRENCY TESTS
# ============================================================================
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_a2a_creation_same_name(client: AsyncClient):
"""Test concurrent A2A agent creation with same name prevents duplicates."""
agent_name = f"test-agent-{uuid.uuid4()}"
async def create_agent():
agent_data = {"agent": {"name": agent_name, "description": "Test agent", "endpoint_url": "http://example.com/agent"}, "team_id": None, "visibility": "public"}
return await client.post("/a2a", json=agent_data, headers=TEST_AUTH_HEADER)
# Run 10 concurrent creations with same name
results = await asyncio.gather(*[create_agent() for _ in range(10)], return_exceptions=True)
# Debug: Print all responses
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f"Result {i}: Exception - {type(r).__name__}: {r}")
else:
print(f"Result {i}: Status {r.status_code}")
if r.status_code not in [201, 409]:
print(f" Body: {r.text[:500]}")
# Count successful creations (201) and conflicts (409)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 201)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code not in [201, 409])
exception_count = sum(1 for r in results if isinstance(r, Exception))
print(f"\nSummary: {success_count} success, {conflict_count} conflicts, {error_count} errors, {exception_count} exceptions")
# Exactly one should succeed, rest should be conflicts
assert success_count == 1, f"Expected 1 success, got {success_count}"
assert conflict_count == 9, f"Expected 9 conflicts, got {conflict_count}"
# No 500 errors
assert all(isinstance(r, Exception) or r.status_code in [201, 409] for r in results), "Some requests returned unexpected status codes"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_a2a_update_same_name(client: AsyncClient):
"""Test concurrent A2A agent updates to same name prevents duplicates."""
# Create two agents
agent1_name = f"agent-1-{uuid.uuid4()}"
agent2_name = f"agent-2-{uuid.uuid4()}"
agent1_data = {"agent": {"name": agent1_name, "description": "Agent 1", "endpoint_url": "http://example.com/agent1"}, "team_id": None, "visibility": "public"}
agent2_data = {"agent": {"name": agent2_name, "description": "Agent 2", "endpoint_url": "http://example.com/agent2"}, "team_id": None, "visibility": "public"}
resp1 = await client.post("/a2a", json=agent1_data, headers=TEST_AUTH_HEADER)
resp2 = await client.post("/a2a", json=agent2_data, headers=TEST_AUTH_HEADER)
assert resp1.status_code == 201, f"Failed to create agent 1: {resp1.status_code} - {resp1.text}"
assert resp2.status_code == 201, f"Failed to create agent 2: {resp2.status_code} - {resp2.text}"
# Get agent IDs
list_resp = await client.get("/a2a", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
agents = list_resp.json()
if isinstance(agents, dict) and "agents" in agents:
agents = agents["agents"]
agent1 = next((a for a in agents if a["name"] == agent1_name), None)
agent2 = next((a for a in agents if a["name"] == agent2_name), None)
assert agent1 is not None and agent2 is not None
agent1_id = agent1["id"]
agent2_id = agent2["id"]
target_name = f"target-agent-{uuid.uuid4()}"
async def update_agent(agent_id: str):
update_data = {"name": target_name, "description": "Updated agent", "endpoint": "http://example.com/updated"}
return await client.put(f"/a2a/{agent_id}", json=update_data, headers=TEST_AUTH_HEADER)
# Try to update both agents to same name concurrently
results = await asyncio.gather(*[update_agent(agent1_id), update_agent(agent2_id)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
# At least one should succeed
assert success_count >= 1, f"Expected at least 1 success, got {success_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_a2a_toggle(client: AsyncClient):
"""Test concurrent enable/disable doesn't cause race condition."""
agent_name = f"toggle-agent-{uuid.uuid4()}"
agent_data = {"agent": {"name": agent_name, "description": "Toggle test agent", "endpoint_url": "http://example.com/agent"}, "team_id": None, "visibility": "public"}
resp = await client.post("/a2a", json=agent_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 201, f"Failed to create agent: {resp.status_code} - {resp.text}"
# Get agent ID
list_resp = await client.get("/a2a", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
agents = list_resp.json()
if isinstance(agents, dict) and "agents" in agents:
agents = agents["agents"]
agent = next((a for a in agents if a["name"] == agent_name), None)
assert agent is not None
agent_id = agent["id"]
async def toggle():
return await client.post(f"/a2a/{agent_id}/toggle", json={"activate": True}, headers=TEST_AUTH_HEADER)
# Run 20 concurrent toggles
results = await asyncio.gather(*[toggle() for _ in range(20)], return_exceptions=True)
# All should succeed or fail cleanly (no 500 errors)
assert all(isinstance(r, Exception) or r.status_code in [200, 303, 404, 409] for r in results), "Some requests returned unexpected status codes"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_a2a_delete_operations(client: AsyncClient):
"""Test concurrent delete operations handle race conditions correctly."""
agent_name = f"delete-agent-{uuid.uuid4()}"
agent_data = {"agent": {"name": agent_name, "description": "Delete test agent", "endpoint_url": "http://example.com/agent"}, "team_id": None, "visibility": "public"}
resp = await client.post("/a2a", json=agent_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 201, f"Failed to create agent: {resp.status_code} - {resp.text}"
# Get agent ID
list_resp = await client.get("/a2a", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
agents = list_resp.json()
if isinstance(agents, dict) and "agents" in agents:
agents = agents["agents"]
agent = next((a for a in agents if a["name"] == agent_name), None)
assert agent is not None
agent_id = agent["id"]
async def delete_agent():
return await client.delete(f"/a2a/{agent_id}", headers=TEST_AUTH_HEADER)
# Run 10 concurrent deletes
results = await asyncio.gather(*[delete_agent() for _ in range(10)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code in [200, 204])
error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 404)
# Exactly one should succeed
assert success_count == 1, f"Expected 1 successful delete, got {success_count}"
assert error_count == 9, f"Expected 9 not found errors, got {error_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_high_concurrency_a2a_creation(client: AsyncClient):
"""Test high concurrency with unique A2A agents."""
async def create_unique_agent(index: int):
agent_data = {"name": f"agent-{uuid.uuid4()}-{index}", "description": f"Agent {index}", "endpoint_url": f"http://example.com/agent{index}"}
return await client.post("/a2a", json={"agent": agent_data, "visibility": "public"}, headers=TEST_AUTH_HEADER)
# Create 50 unique agents concurrently
results = await asyncio.gather(*[create_unique_agent(i) for i in range(50)], return_exceptions=True)
# Debug: Print sample responses
for i, r in enumerate(results[:3]): # Print first 3 for debugging
if isinstance(r, Exception):
print(f"Result {i}: Exception - {type(r).__name__}: {r}")
else:
print(f"Result {i}: Status {r.status_code}")
if r.status_code != 201:
print(f" Body: {r.text[:500]}")
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 201)
error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code not in [201])
exception_count = sum(1 for r in results if isinstance(r, Exception))
print(f"\nSummary: {success_count} success, {error_count} errors, {exception_count} exceptions")
# Allow for occasional failures due to connection pool exhaustion under extreme concurrency
# At least 48 out of 50 should succeed (96% success rate)
assert success_count >= 48, f"Expected at least 48 successful creations, got {success_count}"
# ============================================================================
# SERVER CONCURRENCY TESTS
# ============================================================================
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_server_creation_same_name(client: AsyncClient):
"""Test concurrent server creation with same name prevents duplicates."""
server_name = f"test-server-{uuid.uuid4()}"
async def create_server():
server_data = {"name": server_name, "description": "Test server", "transport": "sse", "url": "http://example.com/sse"}
return await client.post("/servers", json={"server": server_data, "visibility": "public"}, headers=TEST_AUTH_HEADER)
# Run 10 concurrent creations with same name
results = await asyncio.gather(*[create_server() for _ in range(10)], return_exceptions=True)
# Debug: Print sample responses
for i, r in enumerate(results[:3]):
if isinstance(r, Exception):
print(f"Result {i}: Exception - {type(r).__name__}: {r}")
else:
print(f"Result {i}: Status {r.status_code}")
if r.status_code not in [201, 409]:
print(f" Body: {r.text[:500]}")
# Count successful creations (201) and conflicts (409)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 201)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code not in [201, 409])
exception_count = sum(1 for r in results if isinstance(r, Exception))
print(f"\nSummary: {success_count} success, {conflict_count} conflicts, {error_count} errors, {exception_count} exceptions")
# Exactly one should succeed, rest should be conflicts
assert success_count == 1, f"Expected 1 success, got {success_count}"
assert conflict_count == 9, f"Expected 9 conflicts, got {conflict_count}"
# No 500 errors
assert all(isinstance(r, Exception) or r.status_code in [201, 409] for r in results), "Some requests returned unexpected status codes"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_server_update_same_name(client: AsyncClient):
"""Test concurrent server updates to same name prevents duplicates."""
# Create two servers
server1_name = f"server-1-{uuid.uuid4()}"
server2_name = f"server-2-{uuid.uuid4()}"
server1_data = {"server": {"name": server1_name, "description": "Server 1", "transport": "sse", "url": "http://example.com/sse1"}, "team_id": None, "visibility": "public"}
server2_data = {"server": {"name": server2_name, "description": "Server 2", "transport": "sse", "url": "http://example.com/sse2"}, "team_id": None, "visibility": "public"}
resp1 = await client.post("/servers", json=server1_data, headers=TEST_AUTH_HEADER)
resp2 = await client.post("/servers", json=server2_data, headers=TEST_AUTH_HEADER)
assert resp1.status_code == 201, f"Failed to create server 1: {resp1.status_code} - {resp1.text}"
assert resp2.status_code == 201, f"Failed to create server 2: {resp2.status_code} - {resp2.text}"
# Get server IDs
list_resp = await client.get("/servers", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
servers = list_resp.json() # Returns list directly
server1 = next((s for s in servers if s["name"] == server1_name), None)
server2 = next((s for s in servers if s["name"] == server2_name), None)
assert server1 is not None, f"Server 1 not found in list: {[s['name'] for s in servers]}"
assert server2 is not None, f"Server 2 not found in list: {[s['name'] for s in servers]}"
server1_id = server1["id"]
server2_id = server2["id"]
target_name = f"target-server-{uuid.uuid4()}"
async def update_server(server_id: str):
update_data = {"name": target_name, "description": "Updated server", "transport": "sse", "url": "http://example.com/updated"}
return await client.put(f"/servers/{server_id}", json=update_data, headers=TEST_AUTH_HEADER)
# Try to update both servers to same name concurrently
results = await asyncio.gather(*[update_server(server1_id), update_server(server2_id)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
# At least one should succeed
assert success_count >= 1, f"Expected at least 1 success, got {success_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_server_toggle(client: AsyncClient):
"""Test concurrent enable/disable doesn't cause race condition."""
server_name = f"toggle-server-{uuid.uuid4()}"
server_data = {"server": {"name": server_name, "description": "Toggle test server", "transport": "sse", "url": "http://example.com/sse"}, "team_id": None, "visibility": "public"}
resp = await client.post("/servers", json=server_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 201, f"Failed to create server: {resp.status_code} - {resp.text}"
# Get server ID
list_resp = await client.get("/servers", headers=TEST_AUTH_HEADER)
assert list_resp.status_code == 200
servers = list_resp.json() # Returns list directly
server = next((s for s in servers if s["name"] == server_name), None)
assert server is not None, f"Server not found in list: {[s['name'] for s in servers]}"
server_id = server["id"]
async def toggle():
return await client.post(f"/servers/{server_id}/toggle", json={}, headers=TEST_AUTH_HEADER)
# Run 20 concurrent toggles
results = await asyncio.gather(*[toggle() for _ in range(20)], return_exceptions=True)
# All should succeed or fail cleanly (no 500 errors)
assert all(isinstance(r, Exception) or r.status_code in [200, 303, 404, 409] for r in results), "Some requests returned unexpected status codes"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_server_delete_operations(client: AsyncClient):
"""Test concurrent delete operations handle race conditions correctly."""
server_name = f"delete-server-{uuid.uuid4()}"
server_data = {"server": {"name": server_name, "description": "Delete test server", "transport": "sse", "url": "http://example.com/sse"}, "team_id": None, "visibility": "public"}
resp = await client.post("/servers", json=server_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 201, f"Failed to create server: {resp.status_code} - {resp.text}"
# Get server ID from creation response
created_server = resp.json()
server_id = created_server["id"]
async def delete_server():
return await client.delete(f"/servers/{server_id}", headers=TEST_AUTH_HEADER)
# Run 10 concurrent deletes
results = await asyncio.gather(*[delete_server() for _ in range(10)], return_exceptions=True)
# Debug: Print sample responses
for i, r in enumerate(results[:3]):
if isinstance(r, Exception):
print(f"Result {i}: Exception - {type(r).__name__}: {r}")
else:
print(f"Result {i}: Status {r.status_code}")
if r.status_code not in [200, 204, 404]:
print(f" Body: {r.text[:500]}")
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code in [200, 204])
error_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 404)
other_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code not in [200, 204, 404])
exception_count = sum(1 for r in results if isinstance(r, Exception))
print(f"\nSummary: {success_count} success, {error_count} not found, {other_count} other errors, {exception_count} exceptions")
# Exactly one should succeed
assert success_count == 1, f"Expected 1 successful delete, got {success_count}"
assert error_count == 9, f"Expected 9 not found errors, got {error_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_high_concurrency_server_creation(client: AsyncClient):
"""Test high concurrency with unique servers."""
async def create_unique_server(index: int):
server_data = {"name": f"server-{uuid.uuid4()}-{index}", "description": f"Server {index}", "transport": "sse", "url": f"http://example.com/sse{index}"}
return await client.post("/servers", json={"server": server_data, "visibility": "public"}, headers=TEST_AUTH_HEADER)
# Create 50 unique servers concurrently
results = await asyncio.gather(*[create_unique_server(i) for i in range(50)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 201)
failed_count = sum(1 for r in results if isinstance(r, Exception) or (hasattr(r, "status_code") and r.status_code != 201))
# Allow for occasional failures due to connection pool exhaustion under extreme concurrency
# At least 48 out of 50 should succeed (96% success rate)
assert success_count >= 48, f"Expected at least 48 successful creations, got {success_count} (failed: {failed_count})"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_concurrent_team_server_creation_same_name(client: AsyncClient):
"""Test concurrent team server creation with same name prevents duplicates."""
server_name = f"team-server-{uuid.uuid4()}"
async def create_team_server():
server_data = {"server": {"name": server_name, "description": "Team server", "transport": "sse", "url": "http://example.com/sse"}, "team_id": None, "visibility": "team"}
return await client.post("/servers", json=server_data, headers=TEST_AUTH_HEADER)
# Run 10 concurrent creations with same name
results = await asyncio.gather(*[create_team_server() for _ in range(10)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 201)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
# Exactly one should succeed, rest should be conflicts
assert success_count == 1, f"Expected 1 success, got {success_count}"
assert conflict_count == 9, f"Expected 9 conflicts, got {conflict_count}"
# ============================================================================
# SKIP_LOCKED BEHAVIOR TESTS FOR NEW SERVICES
# ============================================================================
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_skip_locked_behavior_prompt_updates(client: AsyncClient):
"""Test that skip_locked allows concurrent prompt operations to proceed without blocking."""
# Create multiple prompts
prompt_ids = []
for i in range(5):
prompt_name = f"skip-lock-prompt-{i}-{uuid.uuid4()}"
prompt_data = {"name": prompt_name, "description": f"Skip lock test prompt {i}", "template": f"Skip lock template {i}", "arguments": "[]", "visibility": "public"}
resp = await client.post("/admin/prompts", data=prompt_data, headers=TEST_AUTH_HEADER)
assert resp.status_code == 200, f"Failed to create prompt {i}: {resp.status_code}"
# Get prompt ID
list_resp = await client.get("/prompts", headers=TEST_AUTH_HEADER)
prompts = list_resp.json() # Returns list directly, not {"data": [...]}
prompt = next((p for p in prompts if p["name"] == prompt_name), None)
if prompt:
prompt_ids.append(prompt["id"])
async def update_prompt(prompt_id: str, index: int):
prompt_name = f"updated-prompt-{index}-{uuid.uuid4()}"
update_data = {"name": prompt_name, "description": f"Updated description {index}", "template": f"Updated template {index}", "arguments": "[]"}
return await client.post(f"/admin/prompts/{prompt_id}/edit", data=update_data, headers=TEST_AUTH_HEADER)
# Update all prompts concurrently
results = await asyncio.gather(*[update_prompt(prompt_id, i) for i, prompt_id in enumerate(prompt_ids)], return_exceptions=True)
# All should succeed (different prompts, skip_locked allows parallel processing)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
assert success_count == len(prompt_ids), f"Expected {len(prompt_ids)} successes, got {success_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_skip_locked_behavior_a2a_updates(client: AsyncClient):
"""Test that skip_locked allows concurrent A2A agent operations to proceed without blocking."""
# Create multiple agents
agent_ids = []
for i in range(5):
agent_name = f"skip-lock-agent-{i}-{uuid.uuid4()}"
agent_data = {"name": agent_name, "description": f"Skip lock test agent {i}", "endpoint_url": f"http://example.com/agent{i}"}
resp = await client.post("/a2a", json={"agent": agent_data, "visibility": "public"}, headers=TEST_AUTH_HEADER)
assert resp.status_code == 201, f"Failed to create agent {i}: {resp.status_code} - {resp.text}"
# Get agent ID
list_resp = await client.get("/a2a", headers=TEST_AUTH_HEADER)
agents = list_resp.json()
if isinstance(agents, dict) and "agents" in agents:
agents = agents["agents"]
agent = next((a for a in agents if a["name"] == agent_name), None)
if agent:
agent_ids.append(agent["id"])
async def update_agent(agent_id: str, index: int):
agent_name = f"updated-agent-{index}-{uuid.uuid4()}"
update_data = {"name": agent_name, "description": f"Updated description {index}", "endpoint_url": f"http://example.com/updated{index}"}
return await client.put(f"/a2a/{agent_id}", json={"agent": update_data}, headers=TEST_AUTH_HEADER)
# Update all agents concurrently
results = await asyncio.gather(*[update_agent(agent_id, i) for i, agent_id in enumerate(agent_ids)], return_exceptions=True)
# All should succeed (different agents, skip_locked allows parallel processing)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
assert success_count == len(agent_ids), f"Expected {len(agent_ids)} successes, got {success_count}"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_skip_locked_behavior_server_updates(client: AsyncClient):
"""Test that skip_locked allows concurrent server operations to proceed without blocking."""
# Create multiple servers
server_ids = []
for i in range(5):
server_name = f"skip-lock-server-{i}-{uuid.uuid4()}"
server_data = {"name": server_name, "description": f"Skip lock test server {i}", "transport": "sse", "url": f"http://example.com/sse{i}"}
resp = await client.post("/servers", json={"server": server_data, "team_id": None, "visibility": "public"}, headers=TEST_AUTH_HEADER)
assert resp.status_code == 201, f"Failed to create server {i}: {resp.status_code}"
# Get server ID from creation response
created_server = resp.json()
if created_server and "id" in created_server:
server_ids.append(created_server["id"])
async def update_server(server_id: str, index: int):
server_name = f"updated-server-{index}-{uuid.uuid4()}"
update_data = {"name": server_name, "description": f"Updated description {index}", "transport": "sse", "url": f"http://example.com/updated{index}"}
return await client.put(f"/servers/{server_id}", json={"server": update_data}, headers=TEST_AUTH_HEADER)
# Update all servers concurrently
results = await asyncio.gather(*[update_server(server_id, i) for i, server_id in enumerate(server_ids)], return_exceptions=True)
# All should succeed (different servers, skip_locked allows parallel processing)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
assert success_count == len(server_ids), f"Expected {len(server_ids)} successes, got {success_count}"
# ============================================================================
# MIXED VISIBILITY TESTS FOR NEW SERVICES
# ============================================================================
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_mixed_visibility_concurrent_prompt_operations(client: AsyncClient):
"""Test concurrent prompt operations with different visibility levels."""
base_uuid = uuid.uuid4()
async def create_public_prompt():
prompt_data = {"name": f"mixed-vis-public-prompt-{base_uuid}", "description": "Public prompt", "template": "Public template", "arguments": "[]", "visibility": "public"}
return await client.post("/admin/prompts", data=prompt_data, headers=TEST_AUTH_HEADER)
async def create_team_prompt():
prompt_data = {"name": f"mixed-vis-team-prompt-{base_uuid}", "description": "Team prompt", "template": "Team template", "arguments": "[]", "visibility": "team"}
return await client.post("/admin/prompts", data=prompt_data, headers=TEST_AUTH_HEADER)
async def create_private_prompt():
prompt_data = {"name": f"mixed-vis-private-prompt-{base_uuid}", "description": "Private prompt", "template": "Private template", "arguments": "[]", "visibility": "private"}
return await client.post("/admin/prompts", data=prompt_data, headers=TEST_AUTH_HEADER)
# Create prompts with different names and visibility concurrently
results = await asyncio.gather(*[create_public_prompt() for _ in range(3)], *[create_team_prompt() for _ in range(3)], *[create_private_prompt() for _ in range(3)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
other_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code not in [200, 409])
exception_count = sum(1 for r in results if isinstance(r, Exception))
# Debug output
if other_count > 0 or exception_count > 0:
print(f"\nDebug: {success_count} success, {conflict_count} conflicts, {other_count} other, {exception_count} exceptions")
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f" Result {i}: Exception - {type(r).__name__}")
elif r.status_code not in [200, 409]:
print(f" Result {i}: Status {r.status_code}")
# Expect 3 successes (one per unique name) and 6 conflicts
# Allow for occasional resource exhaustion (timeouts, connection errors)
assert success_count == 3, f"Expected 3 successes, got {success_count}"
assert conflict_count >= 4, f"Expected at least 4 conflicts, got {conflict_count} (other: {other_count}, exceptions: {exception_count})"
assert success_count + conflict_count + other_count + exception_count == 9, "Total should be 9 requests"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_mixed_visibility_concurrent_a2a_operations(client: AsyncClient):
"""Test concurrent A2A agent operations with different visibility levels."""
base_uuid = uuid.uuid4()
async def create_public_agent():
agent_data = {"name": f"mixed-vis-public-agent-{base_uuid}", "description": "Public agent", "endpoint_url": "http://example.com/public"}
return await client.post("/a2a", json={"agent": agent_data, "visibility": "public"}, headers=TEST_AUTH_HEADER)
async def create_team_agent():
agent_data = {"name": f"mixed-vis-team-agent-{base_uuid}", "description": "Team agent", "endpoint_url": "http://example.com/team"}
return await client.post("/a2a", json={"agent": agent_data, "visibility": "team"}, headers=TEST_AUTH_HEADER)
async def create_private_agent():
agent_data = {"name": f"mixed-vis-private-agent-{base_uuid}", "description": "Private agent", "endpoint_url": "http://example.com/private"}
return await client.post("/a2a", json={"agent": agent_data, "visibility": "private"}, headers=TEST_AUTH_HEADER)
# Create agents with different names and visibility concurrently
results = await asyncio.gather(*[create_public_agent() for _ in range(3)], *[create_team_agent() for _ in range(3)], *[create_private_agent() for _ in range(3)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 201)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
other_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code not in [201, 409])
exception_count = sum(1 for r in results if isinstance(r, Exception))
# With different visibility levels and different names per visibility,
# the uniqueness constraint (team_id, owner_email, slug) allows multiple agents
# Each visibility level has its own scope, so we expect more successes
# At minimum, one per unique (name, visibility) combination should succeed
assert success_count >= 3, f"Expected at least 3 successes, got {success_count}"
assert success_count + conflict_count + other_count + exception_count == 9, "Total should be 9 requests"
@pytest.mark.asyncio
@SKIP_IF_NOT_POSTGRES
async def test_mixed_visibility_concurrent_server_operations(client: AsyncClient):
"""Test concurrent server operations with different visibility levels."""
base_uuid = uuid.uuid4()
async def create_public_server():
server_data = {"name": f"mixed-vis-public-server-{base_uuid}", "description": "Public server", "transport": "sse", "url": "http://example.com/public"}
return await client.post("/servers", json={"server": server_data, "team_id": None, "visibility": "public"}, headers=TEST_AUTH_HEADER)
async def create_team_server():
server_data = {"name": f"mixed-vis-team-server-{base_uuid}", "description": "Team server", "transport": "sse", "url": "http://example.com/team"}
return await client.post("/servers", json={"server": server_data, "team_id": None, "visibility": "team"}, headers=TEST_AUTH_HEADER)
async def create_private_server():
server_data = {"name": f"mixed-vis-private-server-{base_uuid}", "description": "Private server", "transport": "sse", "url": "http://example.com/private"}
return await client.post("/servers", json={"server": server_data, "team_id": None, "visibility": "private"}, headers=TEST_AUTH_HEADER)
# Create servers with different names and visibility concurrently
results = await asyncio.gather(*[create_public_server() for _ in range(3)], *[create_team_server() for _ in range(3)], *[create_private_server() for _ in range(3)], return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 201)
conflict_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 409)
other_count = sum(1 for r in results if not isinstance(r, Exception) and r.status_code not in [201, 409])
exception_count = sum(1 for r in results if isinstance(r, Exception))
# With different visibility levels and different names per visibility,
# the uniqueness constraint (team_id, owner_email, name) allows multiple servers
# Each visibility level has its own scope, so we expect more successes
# At minimum, one per unique (name, visibility) combination should succeed
assert success_count >= 3, f"Expected at least 3 successes, got {success_count}"
assert success_count + conflict_count + other_count + exception_count == 9, "Total should be 9 requests"