"""Tests for transport layer with interceptor integration."""
from unittest.mock import Mock
import httpx
import pytest
from crewai.llms.hooks.base import BaseInterceptor
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
class TrackingInterceptor(BaseInterceptor[httpx.Request, httpx.Response]):
"""Test interceptor that tracks all calls."""
def __init__(self) -> None:
"""Initialize tracking lists."""
self.outbound_calls: list[httpx.Request] = []
self.inbound_calls: list[httpx.Response] = []
self.async_outbound_calls: list[httpx.Request] = []
self.async_inbound_calls: list[httpx.Response] = []
def on_outbound(self, message: httpx.Request) -> httpx.Request:
"""Track outbound calls and add header.
Args:
message: The outbound request.
Returns:
Modified request with tracking header.
"""
self.outbound_calls.append(message)
message.headers["X-Intercepted-Sync"] = "true"
return message
def on_inbound(self, message: httpx.Response) -> httpx.Response:
"""Track inbound calls.
Args:
message: The inbound response.
Returns:
The response with tracking header.
"""
self.inbound_calls.append(message)
message.headers["X-Response-Intercepted-Sync"] = "true"
return message
async def aon_outbound(self, message: httpx.Request) -> httpx.Request:
"""Track async outbound calls and add header.
Args:
message: The outbound request.
Returns:
Modified request with tracking header.
"""
self.async_outbound_calls.append(message)
message.headers["X-Intercepted-Async"] = "true"
return message
async def aon_inbound(self, message: httpx.Response) -> httpx.Response:
"""Track async inbound calls.
Args:
message: The inbound response.
Returns:
The response with tracking header.
"""
self.async_inbound_calls.append(message)
message.headers["X-Response-Intercepted-Async"] = "true"
return message
class TestHTTPTransport:
"""Test suite for sync HTTPTransport with interceptor."""
def test_transport_instantiation(self) -> None:
"""Test that transport can be instantiated with interceptor."""
interceptor = TrackingInterceptor()
transport = HTTPTransport(interceptor=interceptor)
assert transport.interceptor is interceptor
def test_transport_requires_interceptor(self) -> None:
"""Test that transport requires interceptor parameter."""
# HTTPTransport requires an interceptor parameter
with pytest.raises(TypeError):
HTTPTransport()
def test_interceptor_called_on_request(self) -> None:
"""Test that interceptor hooks are called during request handling."""
interceptor = TrackingInterceptor()
transport = HTTPTransport(interceptor=interceptor)
# Create a mock parent transport that returns a response
mock_response = httpx.Response(200, json={"success": True})
mock_parent_handle = Mock(return_value=mock_response)
# Monkey-patch the parent's handle_request
original_handle = httpx.HTTPTransport.handle_request
httpx.HTTPTransport.handle_request = mock_parent_handle
try:
request = httpx.Request("GET", "https://api.example.com/test")
response = transport.handle_request(request)
# Verify interceptor was called
assert len(interceptor.outbound_calls) == 1
assert len(interceptor.inbound_calls) == 1
assert interceptor.outbound_calls[0] is request
assert interceptor.inbound_calls[0] is response
# Verify headers were added
assert "X-Intercepted-Sync" in request.headers
assert request.headers["X-Intercepted-Sync"] == "true"
assert "X-Response-Intercepted-Sync" in response.headers
assert response.headers["X-Response-Intercepted-Sync"] == "true"
finally:
# Restore original method
httpx.HTTPTransport.handle_request = original_handle
class TestAsyncHTTPTransport:
"""Test suite for async AsyncHTTPransport with interceptor."""
def test_async_transport_instantiation(self) -> None:
"""Test that async transport can be instantiated with interceptor."""
interceptor = TrackingInterceptor()
transport = AsyncHTTPTransport(interceptor=interceptor)
assert transport.interceptor is interceptor
def test_async_transport_requires_interceptor(self) -> None:
"""Test that async transport requires interceptor parameter."""
# AsyncHTTPransport requires an interceptor parameter
with pytest.raises(TypeError):
AsyncHTTPTransport()
@pytest.mark.asyncio
async def test_async_interceptor_called_on_request(self) -> None:
"""Test that async interceptor hooks are called during request handling."""
interceptor = TrackingInterceptor()
transport = AsyncHTTPTransport(interceptor=interceptor)
# Create a mock parent transport that returns a response
mock_response = httpx.Response(200, json={"success": True})
async def mock_handle(*args, **kwargs):
return mock_response
mock_parent_handle = Mock(side_effect=mock_handle)
# Monkey-patch the parent's handle_async_request
original_handle = httpx.AsyncHTTPTransport.handle_async_request
httpx.AsyncHTTPTransport.handle_async_request = mock_parent_handle
try:
request = httpx.Request("GET", "https://api.example.com/test")
response = await transport.handle_async_request(request)
# Verify async interceptor was called
assert len(interceptor.async_outbound_calls) == 1
assert len(interceptor.async_inbound_calls) == 1
assert interceptor.async_outbound_calls[0] is request
assert interceptor.async_inbound_calls[0] is response
# Verify sync interceptor was NOT called
assert len(interceptor.outbound_calls) == 0
assert len(interceptor.inbound_calls) == 0
# Verify async headers were added
assert "X-Intercepted-Async" in request.headers
assert request.headers["X-Intercepted-Async"] == "true"
assert "X-Response-Intercepted-Async" in response.headers
assert response.headers["X-Response-Intercepted-Async"] == "true"
finally:
# Restore original method
httpx.AsyncHTTPTransport.handle_async_request = original_handle
class TestTransportIntegration:
"""Test suite for transport integration scenarios."""
def test_multiple_requests_same_interceptor(self) -> None:
"""Test that multiple requests through same interceptor are tracked."""
interceptor = TrackingInterceptor()
transport = HTTPTransport(interceptor=interceptor)
mock_response = httpx.Response(200)
mock_parent_handle = Mock(return_value=mock_response)
original_handle = httpx.HTTPTransport.handle_request
httpx.HTTPTransport.handle_request = mock_parent_handle
try:
# Make multiple requests
requests = [
httpx.Request("GET", "https://api.example.com/1"),
httpx.Request("POST", "https://api.example.com/2"),
httpx.Request("PUT", "https://api.example.com/3"),
]
for req in requests:
transport.handle_request(req)
# Verify all requests were intercepted
assert len(interceptor.outbound_calls) == 3
assert len(interceptor.inbound_calls) == 3
assert interceptor.outbound_calls == requests
finally:
httpx.HTTPTransport.handle_request = original_handle
@pytest.mark.asyncio
async def test_multiple_async_requests_same_interceptor(self) -> None:
"""Test that multiple async requests through same interceptor are tracked."""
interceptor = TrackingInterceptor()
transport = AsyncHTTPTransport(interceptor=interceptor)
mock_response = httpx.Response(200)
async def mock_handle(*args, **kwargs):
return mock_response
mock_parent_handle = Mock(side_effect=mock_handle)
original_handle = httpx.AsyncHTTPTransport.handle_async_request
httpx.AsyncHTTPTransport.handle_async_request = mock_parent_handle
try:
# Make multiple async requests
requests = [
httpx.Request("GET", "https://api.example.com/1"),
httpx.Request("POST", "https://api.example.com/2"),
httpx.Request("DELETE", "https://api.example.com/3"),
]
for req in requests:
await transport.handle_async_request(req)
# Verify all requests were intercepted
assert len(interceptor.async_outbound_calls) == 3
assert len(interceptor.async_inbound_calls) == 3
assert interceptor.async_outbound_calls == requests
finally:
httpx.AsyncHTTPTransport.handle_async_request = original_handle