test_dap_event_routing.py•10.8 kB
"""
Unit tests for DAP event routing mechanism.
Tests the improved event routing system that uses dedicated queues
for different event types to avoid inefficient re-queuing.
"""
import queue
import threading
import time
from unittest.mock import Mock, patch
import pytest
from mcp_debug_tool.dap_wrapper import DAPSyncWrapper
class TestDAPEventRouting:
"""Test the dedicated queue event routing pattern."""
def test_event_queues_initialized(self):
"""Test that dedicated event queues are created on initialization."""
wrapper = DAPSyncWrapper()
# Check that all expected queues exist
expected_queues = [
'initialized', 'stopped', 'continued', 'terminated',
'exited', 'output', 'breakpoint', 'thread'
]
for event_type in expected_queues:
assert event_type in wrapper.event_queues
assert isinstance(wrapper.event_queues[event_type], queue.Queue)
# Check that general queue exists
assert isinstance(wrapper.general_event_queue, queue.Queue)
def test_known_event_routed_to_dedicated_queue(self):
"""Test that known events are routed to their dedicated queues."""
wrapper = DAPSyncWrapper()
# Create a test event
test_event = {
'type': 'event',
'event': 'stopped',
'body': {'reason': 'breakpoint', 'threadId': 1}
}
# Route the event
wrapper._on_dap_event(test_event)
# Check that it went to the right queue
assert wrapper.event_queues['stopped'].qsize() == 1
assert wrapper.general_event_queue.qsize() == 0
# Verify we can retrieve it
retrieved = wrapper.event_queues['stopped'].get_nowait()
assert retrieved == test_event
def test_unknown_event_routed_to_general_queue(self):
"""Test that unknown events are routed to the general queue."""
wrapper = DAPSyncWrapper()
# Create an unknown event type
test_event = {
'type': 'event',
'event': 'customUnknownEvent',
'body': {}
}
# Route the event
wrapper._on_dap_event(test_event)
# Check that it went to general queue
assert wrapper.general_event_queue.qsize() == 1
# Check that no dedicated queue was used
for queue_obj in wrapper.event_queues.values():
assert queue_obj.qsize() == 0
# Verify we can retrieve it
retrieved = wrapper.general_event_queue.get_nowait()
assert retrieved == test_event
def test_multiple_events_different_types(self):
"""Test routing multiple events of different types."""
wrapper = DAPSyncWrapper()
events = [
{'type': 'event', 'event': 'initialized', 'body': {}},
{'type': 'event', 'event': 'stopped', 'body': {}},
{'type': 'event', 'event': 'output', 'body': {'output': 'test'}},
{'type': 'event', 'event': 'terminated', 'body': {}},
]
# Route all events
for event in events:
wrapper._on_dap_event(event)
# Verify each went to the correct queue
assert wrapper.event_queues['initialized'].qsize() == 1
assert wrapper.event_queues['stopped'].qsize() == 1
assert wrapper.event_queues['output'].qsize() == 1
assert wrapper.event_queues['terminated'].qsize() == 1
assert wrapper.general_event_queue.qsize() == 0
def test_wait_for_event_from_dedicated_queue(self):
"""Test waiting for an event that goes to a dedicated queue."""
wrapper = DAPSyncWrapper()
# Event that will be sent
test_event = {'type': 'event', 'event': 'initialized', 'body': {}}
# Send event in background after short delay
def send_event():
time.sleep(0.1)
wrapper._on_dap_event(test_event)
sender = threading.Thread(target=send_event)
sender.start()
# Wait for the event
start = time.time()
received = wrapper._wait_for_event('initialized', timeout=2.0)
duration = time.time() - start
sender.join()
# Verify we got the event quickly
assert received is not None
assert received == test_event
assert duration < 0.5 # Should be fast
def test_wait_for_event_timeout(self):
"""Test that waiting times out correctly when event doesn't arrive."""
wrapper = DAPSyncWrapper()
# Don't send any event
start = time.time()
received = wrapper._wait_for_event('stopped', timeout=0.5)
duration = time.time() - start
# Verify timeout occurred
assert received is None
assert 0.4 < duration < 0.7 # Should timeout around 0.5s
def test_wait_for_unknown_event_type(self):
"""Test waiting for an event type not in dedicated queues."""
wrapper = DAPSyncWrapper()
# Send an unknown event type
test_event = {'type': 'event', 'event': 'customEvent', 'body': {}}
def send_event():
time.sleep(0.1)
wrapper._on_dap_event(test_event)
sender = threading.Thread(target=send_event)
sender.start()
# Wait for the custom event (should search general queue)
received = wrapper._wait_for_event('customEvent', timeout=2.0)
sender.join()
# Should still work, but uses general queue fallback
assert received is not None
assert received == test_event
def test_concurrent_event_routing(self):
"""Test that concurrent event routing works correctly."""
wrapper = DAPSyncWrapper()
# Send many events concurrently
def send_events(event_type: str, count: int):
for i in range(count):
event = {
'type': 'event',
'event': event_type,
'body': {'index': i}
}
wrapper._on_dap_event(event)
time.sleep(0.01)
threads = [
threading.Thread(target=send_events, args=('stopped', 5)),
threading.Thread(target=send_events, args=('output', 5)),
threading.Thread(target=send_events, args=('continued', 5)),
]
for t in threads:
t.start()
for t in threads:
t.join()
# Verify all events were routed correctly
assert wrapper.event_queues['stopped'].qsize() == 5
assert wrapper.event_queues['output'].qsize() == 5
assert wrapper.event_queues['continued'].qsize() == 5
def test_event_order_preserved(self):
"""Test that event order is preserved within each queue."""
wrapper = DAPSyncWrapper()
# Send multiple stopped events
events = []
for i in range(5):
event = {
'type': 'event',
'event': 'stopped',
'body': {'sequence': i}
}
events.append(event)
wrapper._on_dap_event(event)
# Retrieve in order
for i, expected_event in enumerate(events):
received = wrapper._wait_for_event('stopped', timeout=0.1)
assert received is not None
assert received['body']['sequence'] == i
def test_no_event_leakage_between_types(self):
"""Test that events don't leak between different type queues."""
wrapper = DAPSyncWrapper()
# Send a 'stopped' event
stopped_event = {'type': 'event', 'event': 'stopped', 'body': {}}
wrapper._on_dap_event(stopped_event)
# Try to wait for 'continued' (different type)
received = wrapper._wait_for_event('continued', timeout=0.1)
# Should timeout, not get the stopped event
assert received is None
# The stopped event should still be in its queue
assert wrapper.event_queues['stopped'].qsize() == 1
def test_general_queue_fallback_with_wrong_type(self):
"""Test general queue fallback correctly filters by type."""
wrapper = DAPSyncWrapper()
# Send multiple unknown events
event1 = {'type': 'event', 'event': 'custom1', 'body': {'id': 1}}
event2 = {'type': 'event', 'event': 'custom2', 'body': {'id': 2}}
event3 = {'type': 'event', 'event': 'custom1', 'body': {'id': 3}}
wrapper._on_dap_event(event1)
wrapper._on_dap_event(event2)
wrapper._on_dap_event(event3)
# Wait for custom1 events
received1 = wrapper._wait_for_event('custom1', timeout=0.5)
assert received1 is not None
assert received1['body']['id'] == 1
received2 = wrapper._wait_for_event('custom1', timeout=0.5)
assert received2 is not None
assert received2['body']['id'] == 3
# custom2 should still be in queue
assert wrapper.general_event_queue.qsize() == 1
class TestDAPEventRoutingPerformance:
"""Test performance improvements of dedicated queue pattern."""
def test_no_requeuing_for_dedicated_events(self):
"""Verify that dedicated queue events don't require re-queuing."""
wrapper = DAPSyncWrapper()
# Send event
test_event = {'type': 'event', 'event': 'initialized', 'body': {}}
wrapper._on_dap_event(test_event)
# Get the queue reference before retrieval
queue_before = wrapper.event_queues['initialized']
initial_size = queue_before.qsize()
# Retrieve event
received = wrapper._wait_for_event('initialized', timeout=0.1)
# Verify it was retrieved and queue is now empty
assert received is not None
assert queue_before.qsize() == 0
# No items should be put back
def test_fast_event_retrieval(self):
"""Test that dedicated queue retrieval is fast."""
wrapper = DAPSyncWrapper()
# Pre-populate queue
test_event = {'type': 'event', 'event': 'stopped', 'body': {}}
wrapper._on_dap_event(test_event)
# Time the retrieval
start = time.time()
received = wrapper._wait_for_event('stopped', timeout=1.0)
duration = time.time() - start
# Should be nearly instantaneous
assert received is not None
assert duration < 0.01 # Less than 10ms
if __name__ == '__main__':
pytest.main([__file__, '-v'])