Skip to main content
Glama

Hostaway MCP Server

test_load.py10.4 kB
"""Load tests for Hostaway MCP server performance. Tests server behavior under concurrent load to validate: - Rate limiting works correctly - Connection pooling handles concurrent requests - Response times remain acceptable under load - No resource leaks or errors """ import asyncio from typing import Any import pytest from src.mcp.config import HostawayConfig @pytest.mark.performance @pytest.mark.asyncio class TestLoadPerformance: """Load testing for concurrent request handling.""" async def test_concurrent_listing_requests(self, test_config: HostawayConfig) -> None: """Test 100 concurrent listing requests without errors. Validates: - All requests complete successfully - Rate limiting prevents API lockout - Connection pooling handles concurrency - No request exceeds 5 second timeout Args: test_config: Test configuration with credentials """ from src.mcp.auth import TokenManager from src.services.hostaway_client import HostawayClient from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=test_config.rate_limit_ip, account_rate_limit=test_config.rate_limit_account, max_concurrent=test_config.max_concurrent_requests, ) token_manager = TokenManager(config=test_config) client = HostawayClient( config=test_config, token_manager=token_manager, rate_limiter=rate_limiter, ) async def make_request() -> dict[str, Any]: """Make a single listing request.""" return await client.get_listings(limit=5) # Execute 100 concurrent requests tasks = [make_request() for _ in range(100)] results = await asyncio.gather(*tasks, return_exceptions=True) # Verify all requests succeeded errors = [r for r in results if isinstance(r, Exception)] assert len(errors) == 0, f"Expected no errors, got {len(errors)}: {errors[:3]}" # Verify all results are lists successful = [r for r in results if isinstance(r, list)] assert len(successful) == 100, f"Expected 100 successful results, got {len(successful)}" # Cleanup await client.aclose() await token_manager.aclose() async def test_concurrent_booking_searches(self, test_config: HostawayConfig) -> None: """Test 100 concurrent booking search requests. Args: test_config: Test configuration with credentials """ from src.mcp.auth import TokenManager from src.services.hostaway_client import HostawayClient from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=test_config.rate_limit_ip, account_rate_limit=test_config.rate_limit_account, max_concurrent=test_config.max_concurrent_requests, ) token_manager = TokenManager(config=test_config) client = HostawayClient( config=test_config, token_manager=token_manager, rate_limiter=rate_limiter, ) async def make_request() -> dict[str, Any]: """Make a single booking search request.""" return await client.search_bookings(limit=5) # Execute 100 concurrent requests tasks = [make_request() for _ in range(100)] results = await asyncio.gather(*tasks, return_exceptions=True) # Verify all requests succeeded errors = [r for r in results if isinstance(r, Exception)] assert len(errors) == 0, f"Expected no errors, got {len(errors)}: {errors[:3]}" # Verify all results are lists successful = [r for r in results if isinstance(r, list)] assert len(successful) == 100, f"Expected 100 successful results, got {len(successful)}" # Cleanup await client.aclose() await token_manager.aclose() async def test_mixed_concurrent_requests(self, test_config: HostawayConfig) -> None: """Test 100 concurrent requests of mixed types (listings, bookings, financials). Validates: - Server handles diverse concurrent operations - Rate limiting works across different endpoints - No cross-request interference Args: test_config: Test configuration with credentials """ from datetime import date, timedelta from src.mcp.auth import TokenManager from src.services.hostaway_client import HostawayClient from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=test_config.rate_limit_ip, account_rate_limit=test_config.rate_limit_account, max_concurrent=test_config.max_concurrent_requests, ) token_manager = TokenManager(config=test_config) client = HostawayClient( config=test_config, token_manager=token_manager, rate_limiter=rate_limiter, ) today = date.today() start_date = (today - timedelta(days=30)).isoformat() end_date = today.isoformat() async def make_listing_request() -> dict[str, Any]: return await client.get_listings(limit=5) async def make_booking_request() -> dict[str, Any]: return await client.search_bookings(limit=5) async def make_financial_request() -> dict[str, Any]: return await client.get_financial_report( start_date=start_date, end_date=end_date, ) # Create mix of 100 requests (33/33/34 distribution) tasks = ( [make_listing_request() for _ in range(33)] + [make_booking_request() for _ in range(33)] + [make_financial_request() for _ in range(34)] ) # Execute all concurrently results = await asyncio.gather(*tasks, return_exceptions=True) # Verify all requests succeeded errors = [r for r in results if isinstance(r, Exception)] assert len(errors) == 0, f"Expected no errors, got {len(errors)}: {errors[:3]}" # Verify we got 100 results assert len(results) == 100, f"Expected 100 results, got {len(results)}" # Cleanup await client.aclose() await token_manager.aclose() async def test_token_reuse_under_load(self, test_config: HostawayConfig) -> None: """Test that token is reused correctly under concurrent load. Validates: - Single token acquisition for all requests - No token refresh during load (unless necessary) - Thread-safe token sharing Args: test_config: Test configuration with credentials """ from src.mcp.auth import TokenManager from src.services.hostaway_client import HostawayClient from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=test_config.rate_limit_ip, account_rate_limit=test_config.rate_limit_account, max_concurrent=test_config.max_concurrent_requests, ) token_manager = TokenManager(config=test_config) client = HostawayClient( config=test_config, token_manager=token_manager, rate_limiter=rate_limiter, ) # Get initial token initial_token = await token_manager.get_token() initial_access_token = initial_token.access_token async def make_request() -> dict[str, Any]: """Make a request and verify token is reused.""" return await client.get_listings(limit=1) # Execute 50 concurrent requests tasks = [make_request() for _ in range(50)] results = await asyncio.gather(*tasks, return_exceptions=True) # Verify all succeeded errors = [r for r in results if isinstance(r, Exception)] assert len(errors) == 0, f"Expected no errors, got {len(errors)}" # Verify token is still the same (not refreshed unnecessarily) current_token = await token_manager.get_token() assert ( current_token.access_token == initial_access_token ), "Token should be reused, not refreshed" # Cleanup await client.aclose() await token_manager.aclose() @pytest.mark.slow async def test_sustained_load(self, test_config: HostawayConfig) -> None: """Test server performance under sustained load over time. Runs 500 requests in waves of 50 concurrent requests. Validates system stability over extended operation. Args: test_config: Test configuration with credentials """ from src.mcp.auth import TokenManager from src.services.hostaway_client import HostawayClient from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=test_config.rate_limit_ip, account_rate_limit=test_config.rate_limit_account, max_concurrent=test_config.max_concurrent_requests, ) token_manager = TokenManager(config=test_config) client = HostawayClient( config=test_config, token_manager=token_manager, rate_limiter=rate_limiter, ) async def make_request() -> dict[str, Any]: return await client.get_listings(limit=5) total_requests = 0 total_errors = 0 # Run 10 waves of 50 concurrent requests each for wave in range(10): tasks = [make_request() for _ in range(50)] results = await asyncio.gather(*tasks, return_exceptions=True) total_requests += len(results) wave_errors = [r for r in results if isinstance(r, Exception)] total_errors += len(wave_errors) # Small delay between waves to simulate realistic usage await asyncio.sleep(0.5) # Verify low error rate (< 1%) error_rate = (total_errors / total_requests) * 100 if total_requests > 0 else 0 assert error_rate < 1.0, f"Error rate {error_rate:.2f}% exceeds 1% threshold" # Cleanup await client.aclose() await token_manager.aclose()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darrentmorgan/hostaway-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server