Skip to main content
Glama
test_error_handling.py7.15 kB
"""Integration tests for batch operations and error handling.""" import pytest from src.mcp.auth import TokenManager from src.mcp.config import HostawayConfig from src.services.hostaway_client import HostawayClient from src.services.rate_limiter import RateLimiter @pytest.fixture async def hostaway_client(): """Create HostawayClient for testing. Uses test configuration and creates a real client instance for integration testing with batch operations. """ config = HostawayConfig() # type: ignore[call-arg] token_manager = TokenManager(config=config) rate_limiter = RateLimiter() client = HostawayClient( config=config, token_manager=token_manager, rate_limiter=rate_limiter, ) yield client await client.aclose() @pytest.mark.asyncio async def test_batch_operations_all_success(hostaway_client): """Test batch operations where all succeed.""" async def mock_success_op(value: int): """Mock operation that always succeeds.""" return {"status": "success", "id": value} operations = [lambda v=i: mock_success_op(v) for i in range(3)] result = await hostaway_client.execute_batch(operations) assert result.total_count == 3 assert result.success_count == 3 assert result.failure_count == 0 assert result.has_successes is True assert result.has_failures is False assert len(result.successful) == 3 assert result.partial_success is False @pytest.mark.asyncio async def test_batch_operations_mixed_results(hostaway_client): """Test batch operations with mixed success/failure.""" async def success_op(): """Operation that succeeds.""" return {"status": "success"} async def failure_op(): """Operation that fails.""" raise ValueError("Test failure") operations = [success_op, failure_op, success_op] operation_ids = ["op_1", "op_2", "op_3"] result = await hostaway_client.execute_batch(operations, operation_ids) assert result.total_count == 3 assert result.success_count == 2 assert result.failure_count == 1 assert result.partial_success is True # Check failed operation details assert len(result.failed) == 1 assert result.failed[0].operation_id == "op_2" assert "Test failure" in result.failed[0].error # type: ignore[operator] # Check successful operations assert len(result.successful) == 2 assert result.successful[0].operation_id == "op_1" assert result.successful[1].operation_id == "op_3" @pytest.mark.asyncio async def test_batch_operations_all_failed(hostaway_client): """Test batch operations where all fail.""" async def failure_op(error_msg: str): """Operation that fails with specific message.""" raise RuntimeError(error_msg) operations = [ lambda: failure_op("Error 1"), lambda: failure_op("Error 2"), ] result = await hostaway_client.execute_batch(operations) assert result.total_count == 2 assert result.success_count == 0 assert result.failure_count == 2 assert result.has_failures is True assert result.has_successes is False assert len(result.failed) == 2 assert result.partial_success is False @pytest.mark.asyncio async def test_batch_operations_preserves_order(hostaway_client): """Test that batch operations preserve execution order.""" results_order = [] async def tracked_op(value: int): """Operation that tracks execution order.""" results_order.append(value) return {"value": value} operations = [ lambda: tracked_op(1), lambda: tracked_op(2), lambda: tracked_op(3), ] result = await hostaway_client.execute_batch(operations) # Verify operations executed in order assert results_order == [1, 2, 3] assert result.success_count == 3 # Verify results are in order assert result.successful[0].data["value"] == 1 # type: ignore[index] assert result.successful[1].data["value"] == 2 # type: ignore[index] assert result.successful[2].data["value"] == 3 # type: ignore[index] @pytest.mark.asyncio async def test_batch_operations_with_different_error_types(hostaway_client): """Test batch operations with different exception types.""" async def value_error_op(): raise ValueError("Value error") async def type_error_op(): raise TypeError("Type error") async def success_op(): return {"status": "ok"} operations = [value_error_op, type_error_op, success_op] result = await hostaway_client.execute_batch(operations) assert result.total_count == 3 assert result.success_count == 1 assert result.failure_count == 2 # Check error types are captured assert "ValueError" in str(result.failed[0].error) or "Value error" in str( result.failed[0].error ) assert "TypeError" in str(result.failed[1].error) or "Type error" in str(result.failed[1].error) @pytest.mark.asyncio async def test_batch_operations_empty_list(hostaway_client): """Test batch operations with empty operations list.""" operations = [] # type: ignore[var-annotated] result = await hostaway_client.execute_batch(operations) assert result.total_count == 0 assert result.success_count == 0 assert result.failure_count == 0 assert result.has_successes is False assert result.has_failures is False @pytest.mark.asyncio async def test_batch_operations_custom_operation_ids(hostaway_client): """Test batch operations with custom operation IDs.""" async def simple_op(): return {"result": "done"} operations = [simple_op, simple_op, simple_op] operation_ids = ["custom_1", "custom_2", "custom_3"] result = await hostaway_client.execute_batch(operations, operation_ids) assert result.success_count == 3 assert result.successful[0].operation_id == "custom_1" assert result.successful[1].operation_id == "custom_2" assert result.successful[2].operation_id == "custom_3" @pytest.mark.asyncio async def test_batch_operations_auto_generated_ids(hostaway_client): """Test batch operations with auto-generated operation IDs.""" async def simple_op(): return {"result": "done"} operations = [simple_op, simple_op] result = await hostaway_client.execute_batch(operations) # Should auto-generate IDs like "op_0", "op_1" assert result.successful[0].operation_id == "op_0" assert result.successful[1].operation_id == "op_1" @pytest.mark.asyncio async def test_batch_operations_logs_failures(hostaway_client, caplog): """Test that batch operations log failures appropriately.""" async def failing_op(): raise RuntimeError("Intentional test failure") operations = [failing_op] operation_ids = ["test_op"] with caplog.at_level("WARNING"): result = await hostaway_client.execute_batch(operations, operation_ids) assert result.failure_count == 1 # Check that failure was logged assert any("Batch operation test_op failed" in record.message for record in caplog.records)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darrentmorgan/hostaway-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server