Skip to main content
Glama

Hostaway MCP Server

test_rate_limiting.py11.2 kB
"""Performance tests for rate limiting under load. Tests rate limiter behavior and effectiveness under various load patterns: - Burst traffic handling - Rate limit enforcement - Queue backpressure - Concurrent request limiting """ import asyncio import time from typing import Any import pytest from src.mcp.config import HostawayConfig @pytest.mark.performance @pytest.mark.asyncio class TestRateLimitingPerformance: """Performance tests for rate limiter under load.""" async def test_rate_limit_enforcement(self) -> None: """Test that rate limiter enforces configured limits. Validates: - IP rate limit (15 req/10s) is enforced - Account rate limit (20 req/10s) is enforced - Requests are queued, not rejected This test uses a lower rate limit for faster testing. """ from src.services.rate_limiter import RateLimiter # Create rate limiter with low limits for testing rate_limiter = RateLimiter( ip_rate_limit=10, # 10 requests per 10 seconds account_rate_limit=10, max_concurrent=5, ) start_time = time.time() request_times = [] async def make_request(request_id: int) -> int: """Make a rate-limited request and record timing.""" async with rate_limiter.acquire(): request_times.append((request_id, time.time() - start_time)) return request_id # Try to make 25 requests (2.5x the rate limit) tasks = [make_request(i) for i in range(25)] results = await asyncio.gather(*tasks) # Verify all requests completed assert len(results) == 25, "All requests should complete (queued, not rejected)" # Verify rate limiting worked - should take at least 10 seconds # First 10 requests immediate, next 15 after 10s elapsed_time = time.time() - start_time assert elapsed_time >= 10.0, f"Should take ≥10s for 25 requests, took {elapsed_time:.2f}s" # Verify requests were distributed over time (not all at once) first_10 = [t for _, t in request_times[:10]] last_15 = [t for _, t in request_times[10:]] # First 10 should complete quickly (within 1 second) assert max(first_10) < 1.0, "First 10 requests should complete immediately" # Last 15 should be delayed (after 10+ seconds) assert min(last_15) >= 10.0, "Requests 11-25 should be delayed by rate limit" async def test_concurrent_limit_enforcement(self) -> None: """Test that max concurrent requests limit is enforced. Validates: - No more than max_concurrent requests execute simultaneously - Excess requests are queued - Queue processes in order """ from src.services.rate_limiter import RateLimiter max_concurrent = 5 rate_limiter = RateLimiter( ip_rate_limit=100, # High rate limit to not interfere account_rate_limit=100, max_concurrent=max_concurrent, ) active_count = 0 max_active = 0 lock = asyncio.Lock() async def make_request(request_id: int) -> tuple[int, int]: """Track concurrent request count.""" nonlocal active_count, max_active async with rate_limiter.acquire(): async with lock: active_count += 1 max_active = max(max_active, active_count) # Simulate some work await asyncio.sleep(0.1) async with lock: active_count -= 1 return request_id, max_active # Launch 20 requests (4x the concurrent limit) tasks = [make_request(i) for i in range(20)] results = await asyncio.gather(*tasks) # Verify all completed assert len(results) == 20 # Verify concurrent limit was never exceeded assert ( max_active <= max_concurrent ), f"Max concurrent {max_active} exceeded limit {max_concurrent}" async def test_burst_traffic_handling(self, test_config: HostawayConfig) -> None: """Test rate limiter handles burst traffic without errors. Simulates realistic burst pattern: - Idle period - Sudden burst of requests - Gradual decline - Repeat Args: test_config: Test configuration with credentials """ from src.mcp.auth import TokenManager from src.services.hostaway_client import HostawayClient from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=test_config.rate_limit_ip, account_rate_limit=test_config.rate_limit_account, max_concurrent=test_config.max_concurrent_requests, ) token_manager = TokenManager(config=test_config) client = HostawayClient( config=test_config, token_manager=token_manager, rate_limiter=rate_limiter, ) async def make_request() -> dict[str, Any]: return await client.get_listings(limit=1) total_errors = 0 # Burst pattern: 30 requests, wait, 20 requests, wait, 10 requests for burst_size in [30, 20, 10]: tasks = [make_request() for _ in range(burst_size)] results = await asyncio.gather(*tasks, return_exceptions=True) errors = [r for r in results if isinstance(r, Exception)] total_errors += len(errors) # Wait between bursts await asyncio.sleep(2.0) # Verify low error rate total_requests = 30 + 20 + 10 error_rate = (total_errors / total_requests) * 100 assert error_rate < 5.0, f"Error rate {error_rate:.2f}% exceeds 5% threshold" # Cleanup await client.aclose() await token_manager.aclose() async def test_rate_limit_recovery(self) -> None: """Test that rate limiter recovers after hitting limits. Validates: - Rate limits reset after time window - System returns to normal after congestion - No permanent degradation """ from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=10, # 10 req/10s account_rate_limit=10, max_concurrent=5, ) async def make_request() -> None: async with rate_limiter.acquire(): await asyncio.sleep(0.01) # Phase 1: Hit rate limit (15 requests) phase1_start = time.time() await asyncio.gather(*[make_request() for _ in range(15)]) phase1_duration = time.time() - phase1_start # Should take ~10s due to rate limiting assert phase1_duration >= 10.0, "Phase 1 should hit rate limit" # Phase 2: Wait for window to reset await asyncio.sleep(1.0) # Phase 3: Make new requests (should be fast again) phase3_start = time.time() await asyncio.gather(*[make_request() for _ in range(10)]) phase3_duration = time.time() - phase3_start # Should complete quickly (within rate limit) assert ( phase3_duration < 2.0 ), f"Phase 3 should be fast after recovery, took {phase3_duration:.2f}s" async def test_rate_limiter_fairness(self) -> None: """Test that rate limiter distributes capacity fairly across requests. Validates: - FIFO queue ordering - No request starvation - Predictable delays """ from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=10, account_rate_limit=10, max_concurrent=5, ) completion_order = [] lock = asyncio.Lock() async def make_request(request_id: int) -> int: """Make request and record completion order.""" async with rate_limiter.acquire(): await asyncio.sleep(0.01) async with lock: completion_order.append(request_id) return request_id # Launch 20 requests in order tasks = [make_request(i) for i in range(20)] await asyncio.gather(*tasks) # Verify roughly FIFO order (allowing for some concurrency) # First 5 can be in any order (concurrent) # But overall trend should be sequential for i in range(len(completion_order) - 1): # Check that we don't have extreme out-of-order (>10 positions) position_delta = abs(completion_order[i + 1] - completion_order[i]) assert position_delta <= 10, ( f"Request order too scrambled at position {i}: " f"{completion_order[i]} -> {completion_order[i + 1]}" ) @pytest.mark.slow async def test_sustained_rate_limiting(self, test_config: HostawayConfig) -> None: """Test rate limiter performance under sustained load over time. Runs for 30 seconds with constant request rate to validate: - No rate limiter degradation over time - Consistent throughput - Memory stability Args: test_config: Test configuration with credentials """ from src.mcp.auth import TokenManager from src.services.hostaway_client import HostawayClient from src.services.rate_limiter import RateLimiter rate_limiter = RateLimiter( ip_rate_limit=test_config.rate_limit_ip, account_rate_limit=test_config.rate_limit_account, max_concurrent=test_config.max_concurrent_requests, ) token_manager = TokenManager(config=test_config) client = HostawayClient( config=test_config, token_manager=token_manager, rate_limiter=rate_limiter, ) async def make_request() -> dict[str, Any]: return await client.get_listings(limit=1) start_time = time.time() total_requests = 0 total_errors = 0 # Run for 30 seconds while time.time() - start_time < 30.0: # Launch 10 concurrent requests tasks = [make_request() for _ in range(10)] results = await asyncio.gather(*tasks, return_exceptions=True) total_requests += len(results) errors = [r for r in results if isinstance(r, Exception)] total_errors += len(errors) # Brief pause before next wave await asyncio.sleep(0.5) # Verify low error rate throughout error_rate = (total_errors / total_requests) * 100 if total_requests > 0 else 0 assert error_rate < 2.0, f"Error rate {error_rate:.2f}% exceeds 2% threshold" # Verify we processed reasonable number of requests (>= 50) assert total_requests >= 50, f"Should process >= 50 requests in 30s, got {total_requests}" # Cleanup await client.aclose() await token_manager.aclose()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darrentmorgan/hostaway-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server