Skip to main content
Glama
test_dap_event_routing.py10.8 kB
""" Unit tests for DAP event routing mechanism. Tests the improved event routing system that uses dedicated queues for different event types to avoid inefficient re-queuing. """ import queue import threading import time from unittest.mock import Mock, patch import pytest from mcp_debug_tool.dap_wrapper import DAPSyncWrapper class TestDAPEventRouting: """Test the dedicated queue event routing pattern.""" def test_event_queues_initialized(self): """Test that dedicated event queues are created on initialization.""" wrapper = DAPSyncWrapper() # Check that all expected queues exist expected_queues = [ 'initialized', 'stopped', 'continued', 'terminated', 'exited', 'output', 'breakpoint', 'thread' ] for event_type in expected_queues: assert event_type in wrapper.event_queues assert isinstance(wrapper.event_queues[event_type], queue.Queue) # Check that general queue exists assert isinstance(wrapper.general_event_queue, queue.Queue) def test_known_event_routed_to_dedicated_queue(self): """Test that known events are routed to their dedicated queues.""" wrapper = DAPSyncWrapper() # Create a test event test_event = { 'type': 'event', 'event': 'stopped', 'body': {'reason': 'breakpoint', 'threadId': 1} } # Route the event wrapper._on_dap_event(test_event) # Check that it went to the right queue assert wrapper.event_queues['stopped'].qsize() == 1 assert wrapper.general_event_queue.qsize() == 0 # Verify we can retrieve it retrieved = wrapper.event_queues['stopped'].get_nowait() assert retrieved == test_event def test_unknown_event_routed_to_general_queue(self): """Test that unknown events are routed to the general queue.""" wrapper = DAPSyncWrapper() # Create an unknown event type test_event = { 'type': 'event', 'event': 'customUnknownEvent', 'body': {} } # Route the event wrapper._on_dap_event(test_event) # Check that it went to general queue assert wrapper.general_event_queue.qsize() == 1 # Check that no dedicated queue was used for queue_obj in wrapper.event_queues.values(): assert queue_obj.qsize() == 0 # Verify we can retrieve it retrieved = wrapper.general_event_queue.get_nowait() assert retrieved == test_event def test_multiple_events_different_types(self): """Test routing multiple events of different types.""" wrapper = DAPSyncWrapper() events = [ {'type': 'event', 'event': 'initialized', 'body': {}}, {'type': 'event', 'event': 'stopped', 'body': {}}, {'type': 'event', 'event': 'output', 'body': {'output': 'test'}}, {'type': 'event', 'event': 'terminated', 'body': {}}, ] # Route all events for event in events: wrapper._on_dap_event(event) # Verify each went to the correct queue assert wrapper.event_queues['initialized'].qsize() == 1 assert wrapper.event_queues['stopped'].qsize() == 1 assert wrapper.event_queues['output'].qsize() == 1 assert wrapper.event_queues['terminated'].qsize() == 1 assert wrapper.general_event_queue.qsize() == 0 def test_wait_for_event_from_dedicated_queue(self): """Test waiting for an event that goes to a dedicated queue.""" wrapper = DAPSyncWrapper() # Event that will be sent test_event = {'type': 'event', 'event': 'initialized', 'body': {}} # Send event in background after short delay def send_event(): time.sleep(0.1) wrapper._on_dap_event(test_event) sender = threading.Thread(target=send_event) sender.start() # Wait for the event start = time.time() received = wrapper._wait_for_event('initialized', timeout=2.0) duration = time.time() - start sender.join() # Verify we got the event quickly assert received is not None assert received == test_event assert duration < 0.5 # Should be fast def test_wait_for_event_timeout(self): """Test that waiting times out correctly when event doesn't arrive.""" wrapper = DAPSyncWrapper() # Don't send any event start = time.time() received = wrapper._wait_for_event('stopped', timeout=0.5) duration = time.time() - start # Verify timeout occurred assert received is None assert 0.4 < duration < 0.7 # Should timeout around 0.5s def test_wait_for_unknown_event_type(self): """Test waiting for an event type not in dedicated queues.""" wrapper = DAPSyncWrapper() # Send an unknown event type test_event = {'type': 'event', 'event': 'customEvent', 'body': {}} def send_event(): time.sleep(0.1) wrapper._on_dap_event(test_event) sender = threading.Thread(target=send_event) sender.start() # Wait for the custom event (should search general queue) received = wrapper._wait_for_event('customEvent', timeout=2.0) sender.join() # Should still work, but uses general queue fallback assert received is not None assert received == test_event def test_concurrent_event_routing(self): """Test that concurrent event routing works correctly.""" wrapper = DAPSyncWrapper() # Send many events concurrently def send_events(event_type: str, count: int): for i in range(count): event = { 'type': 'event', 'event': event_type, 'body': {'index': i} } wrapper._on_dap_event(event) time.sleep(0.01) threads = [ threading.Thread(target=send_events, args=('stopped', 5)), threading.Thread(target=send_events, args=('output', 5)), threading.Thread(target=send_events, args=('continued', 5)), ] for t in threads: t.start() for t in threads: t.join() # Verify all events were routed correctly assert wrapper.event_queues['stopped'].qsize() == 5 assert wrapper.event_queues['output'].qsize() == 5 assert wrapper.event_queues['continued'].qsize() == 5 def test_event_order_preserved(self): """Test that event order is preserved within each queue.""" wrapper = DAPSyncWrapper() # Send multiple stopped events events = [] for i in range(5): event = { 'type': 'event', 'event': 'stopped', 'body': {'sequence': i} } events.append(event) wrapper._on_dap_event(event) # Retrieve in order for i, expected_event in enumerate(events): received = wrapper._wait_for_event('stopped', timeout=0.1) assert received is not None assert received['body']['sequence'] == i def test_no_event_leakage_between_types(self): """Test that events don't leak between different type queues.""" wrapper = DAPSyncWrapper() # Send a 'stopped' event stopped_event = {'type': 'event', 'event': 'stopped', 'body': {}} wrapper._on_dap_event(stopped_event) # Try to wait for 'continued' (different type) received = wrapper._wait_for_event('continued', timeout=0.1) # Should timeout, not get the stopped event assert received is None # The stopped event should still be in its queue assert wrapper.event_queues['stopped'].qsize() == 1 def test_general_queue_fallback_with_wrong_type(self): """Test general queue fallback correctly filters by type.""" wrapper = DAPSyncWrapper() # Send multiple unknown events event1 = {'type': 'event', 'event': 'custom1', 'body': {'id': 1}} event2 = {'type': 'event', 'event': 'custom2', 'body': {'id': 2}} event3 = {'type': 'event', 'event': 'custom1', 'body': {'id': 3}} wrapper._on_dap_event(event1) wrapper._on_dap_event(event2) wrapper._on_dap_event(event3) # Wait for custom1 events received1 = wrapper._wait_for_event('custom1', timeout=0.5) assert received1 is not None assert received1['body']['id'] == 1 received2 = wrapper._wait_for_event('custom1', timeout=0.5) assert received2 is not None assert received2['body']['id'] == 3 # custom2 should still be in queue assert wrapper.general_event_queue.qsize() == 1 class TestDAPEventRoutingPerformance: """Test performance improvements of dedicated queue pattern.""" def test_no_requeuing_for_dedicated_events(self): """Verify that dedicated queue events don't require re-queuing.""" wrapper = DAPSyncWrapper() # Send event test_event = {'type': 'event', 'event': 'initialized', 'body': {}} wrapper._on_dap_event(test_event) # Get the queue reference before retrieval queue_before = wrapper.event_queues['initialized'] initial_size = queue_before.qsize() # Retrieve event received = wrapper._wait_for_event('initialized', timeout=0.1) # Verify it was retrieved and queue is now empty assert received is not None assert queue_before.qsize() == 0 # No items should be put back def test_fast_event_retrieval(self): """Test that dedicated queue retrieval is fast.""" wrapper = DAPSyncWrapper() # Pre-populate queue test_event = {'type': 'event', 'event': 'stopped', 'body': {}} wrapper._on_dap_event(test_event) # Time the retrieval start = time.time() received = wrapper._wait_for_event('stopped', timeout=1.0) duration = time.time() - start # Should be nearly instantaneous assert received is not None assert duration < 0.01 # Less than 10ms if __name__ == '__main__': pytest.main([__file__, '-v'])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaina3/Debug-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server