"""
Comprehensive Test Suite for Refactored Qdrant Middleware
This test suite validates the refactored Qdrant middleware functionality including:
- Proper delegation to qdrant_core managers
- UTF-8 data sanitization and error handling
- Resource registration and FastMCP integration
- Backward compatibility with legacy interfaces
- Actual data storage and retrieval operations
- Performance characteristics and startup behavior
"""
import asyncio
import base64
import json
import time
import uuid
from unittest.mock import AsyncMock, MagicMock
import pytest
from config.enhanced_logging import setup_logger
from .base_test_config import TEST_EMAIL
from .test_helpers import ToolTestRunner
logger = setup_logger()
@pytest.mark.service("qdrant")
class TestQdrantRefactoredMiddleware:
"""Test the refactored middleware integration with qdrant_core managers."""
@pytest.mark.asyncio
async def test_middleware_imports_correctly(self, client):
"""Test that the refactored middleware imports and initializes properly."""
# Test that we can import the refactored middleware
# Test backward compatibility aliases
from middleware.qdrant_middleware import (
EnhancedQdrantResponseMiddleware,
QdrantResponseMiddleware,
QdrantUnifiedMiddleware,
setup_enhanced_qdrant_tools,
)
# Should be the same class
assert QdrantResponseMiddleware == QdrantUnifiedMiddleware
assert EnhancedQdrantResponseMiddleware == QdrantUnifiedMiddleware
# Test that setup function is available
assert callable(setup_enhanced_qdrant_tools)
@pytest.mark.asyncio
async def test_middleware_has_core_managers(self, client):
"""Test that middleware instance has qdrant_core manager attributes."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
# Create middleware instance
middleware = QdrantUnifiedMiddleware()
# Should have manager attributes
assert hasattr(middleware, "client_manager")
assert hasattr(middleware, "storage_manager")
assert hasattr(middleware, "search_manager")
assert hasattr(middleware, "resource_handler")
# Managers should be properly initialized
from middleware.qdrant_core.client import QdrantClientManager
from middleware.qdrant_core.resource_handler import QdrantResourceHandler
from middleware.qdrant_core.search import QdrantSearchManager
from middleware.qdrant_core.storage import QdrantStorageManager
assert isinstance(middleware.client_manager, QdrantClientManager)
assert isinstance(middleware.storage_manager, QdrantStorageManager)
assert isinstance(middleware.search_manager, QdrantSearchManager)
assert isinstance(middleware.resource_handler, QdrantResourceHandler)
@pytest.mark.asyncio
async def test_middleware_constructor_compatibility(self, client):
"""Test that middleware constructor accepts all legacy parameters."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
# Test with all parameters (should not raise any errors)
middleware = QdrantUnifiedMiddleware(
qdrant_host="test_host",
qdrant_port=1234,
qdrant_api_key="test_key",
qdrant_url="https://test.qdrant.cloud",
collection_name="test_collection",
embedding_model="test-model",
summary_max_tokens=100,
verbose_param="test_verbose",
enabled=True,
compression_threshold=2048,
auto_discovery=False,
ports=[1234, 5678],
)
# Legacy properties should still work
assert middleware.qdrant_host == "test_host"
assert middleware.qdrant_port == 1234
assert middleware.qdrant_api_key == "test_key"
assert middleware.qdrant_url == "https://test.qdrant.cloud"
assert middleware.auto_discovery == False
# Config should be properly set
assert middleware.config.host == "test_host"
assert middleware.config.collection_name == "test_collection"
assert middleware.config.embedding_model == "test-model"
assert middleware.config.compression_threshold == 2048
@pytest.mark.asyncio
async def test_middleware_legacy_properties(self, client):
"""Test that legacy properties still work via delegation."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# Legacy properties should delegate to managers
assert hasattr(middleware, "is_initialized")
assert hasattr(middleware, "client")
assert hasattr(middleware, "embedder")
assert hasattr(middleware, "discovered_url")
# Should be boolean initially
assert isinstance(middleware.is_initialized, bool)
# Client and embedder should be None initially (not initialized)
assert middleware.client is None
assert middleware.embedder is None
@pytest.mark.service("qdrant")
class TestQdrantRefactoredTools:
"""Test that Qdrant tools work with refactored middleware."""
@pytest.mark.asyncio
async def test_qdrant_tools_available(self, client):
"""Test that Qdrant tools are available with refactored middleware."""
tools = await client.list_tools()
tool_names = [tool.name for tool in tools]
# New unified tools should be available
assert "search" in tool_names, "Unified search tool should be available"
assert "fetch" in tool_names, "Unified fetch tool should be available"
# Legacy tools should still be available (backward compatibility)
assert (
"search_tool_history" in tool_names
), "Legacy search_tool_history should still be available"
assert (
"get_tool_analytics" in tool_names
), "Legacy get_tool_analytics should still be available"
assert (
"get_response_details" in tool_names
), "Legacy get_response_details should still be available"
@pytest.mark.asyncio
async def test_search_tool_functionality(self, client):
"""Test that the search tool works correctly with refactored middleware."""
runner = ToolTestRunner(client, TEST_EMAIL)
# Test different search capabilities
test_queries = [
"overview",
"analytics",
"service:gmail",
"semantic search test",
]
for query in test_queries:
result = await client.call_tool("search", {"query": query})
assert (
result is not None
), f"Search with query '{query}' should return a result"
content = (
result.content[0].text if hasattr(result, "content") else str(result)
)
try:
data = json.loads(content)
assert (
"results" in data
), "Response should have 'results' field (MCP standard)"
assert isinstance(data["results"], list), "Results should be a list"
except json.JSONDecodeError:
# If not JSON, should be an error message
assert (
"error" in content.lower() or "qdrant" in content.lower()
), f"Non-JSON response should be an error message for query '{query}'"
@pytest.mark.asyncio
async def test_fetch_tool_functionality(self, client):
"""Test that the fetch tool works correctly with refactored middleware."""
# Test with a random UUID
test_id = str(uuid.uuid4())
result = await client.call_tool("fetch", {"point_id": test_id})
assert result is not None, "Fetch should return a result"
content = result.content[0].text if hasattr(result, "content") else str(result)
try:
data = json.loads(content)
# Should have proper fetch response format
assert "id" in data, "Fetch response must have 'id' field"
assert "title" in data, "Fetch response must have 'title' field"
assert "text" in data, "Fetch response must have 'text' field"
assert "url" in data, "Fetch response must have 'url' field"
assert "metadata" in data, "Fetch response must have 'metadata' field"
# For non-existent ID, should indicate not found
assert "not found" in data["title"].lower() or data["found"] == False
except json.JSONDecodeError:
assert "error" in content.lower(), "Should return proper error response"
@pytest.mark.asyncio
async def test_legacy_tools_still_work(self, client):
"""Test that legacy tools continue to work with refactored middleware."""
# Test legacy search_tool_history
result = await client.call_tool(
"search_tool_history", {"query": "test legacy compatibility", "limit": 5}
)
assert result is not None, "Legacy search_tool_history should still work"
# Test legacy get_tool_analytics
result = await client.call_tool("get_tool_analytics", {})
assert result is not None, "Legacy get_tool_analytics should still work"
# Test legacy get_response_details (parameter renamed from response_id to point_id)
test_id = str(uuid.uuid4())
result = await client.call_tool("get_response_details", {"point_id": test_id})
assert result is not None, "Legacy get_response_details should still work"
@pytest.mark.service("qdrant")
class TestQdrantResourceHandling:
"""Test that Qdrant resource handling works with refactored middleware."""
@pytest.mark.asyncio
async def test_qdrant_collection_info_resource_returns_pydantic(self, client):
"""Test reading Qdrant collection info resource returns proper Pydantic models."""
print("\n๐งช TEST: Reading qdrant://collections/list resource")
try:
print("๐ Calling client.read_resource('qdrant://collections/list')")
content = await client.read_resource("qdrant://collections/list")
print(f"โ
Got response: {type(content)} - {content}")
assert content is not None, "Should return collection info"
# FastMCP wraps our Pydantic response in TextResourceContents
# Extract the actual JSON data
import json
if isinstance(content, list) and len(content) > 0:
resource_content = content[0]
print(f"๐ Resource content type: {type(resource_content)}")
# Get JSON text from the resource content
json_text = resource_content.text
print(f"๐ JSON text: {json_text}")
# Parse JSON back to dict, then create Pydantic model
json_data = json.loads(json_text)
print(f"๐ Parsed JSON data: {json_data}")
# Determine which Pydantic model to create based on content
from middleware.qdrant_types import (
QdrantCollectionsListResponse,
QdrantErrorResponse,
)
if "error" in json_data:
print("โ ๏ธ Creating QdrantErrorResponse from JSON")
pydantic_obj = QdrantErrorResponse(**json_data)
print(f"โ
Created QdrantErrorResponse: {pydantic_obj}")
assert hasattr(pydantic_obj, "error"), "Should have error field"
assert hasattr(pydantic_obj, "uri"), "Should have uri field"
print(f" error: {pydantic_obj.error}")
print(f" uri: {pydantic_obj.uri}")
elif "collections" in json_data:
print("โ
Creating QdrantCollectionsListResponse from JSON")
pydantic_obj = QdrantCollectionsListResponse(**json_data)
print(f"โ
Created QdrantCollectionsListResponse: {pydantic_obj}")
assert hasattr(
pydantic_obj, "qdrant_enabled"
), "Should have qdrant_enabled field"
assert hasattr(
pydantic_obj, "collections"
), "Should have collections field"
assert hasattr(
pydantic_obj, "total_collections"
), "Should have total_collections field"
print(f" qdrant_enabled: {pydantic_obj.qdrant_enabled}")
print(f" collections: {len(pydantic_obj.collections)} items")
print(f" total_collections: {pydantic_obj.total_collections}")
else:
raise AssertionError(f"Unknown response format: {json_data}")
print("โ
Successfully parsed Pydantic model from FastMCP response!")
else:
raise AssertionError(
f"Expected list with resource content, got: {type(content)}"
)
except Exception as e:
print(f"โ Exception occurred: {type(e).__name__}: {e}")
# Should not be tuple/attribute errors anymore
error_msg = str(e).lower()
print(
f"๐ Error message check - contains 'tuple': {'tuple' in error_msg}, contains 'attribute': {'attribute' in error_msg}"
)
assert (
"tuple" not in error_msg and "attribute" not in error_msg
), f"Should not have tuple/attribute errors: {e}"
@pytest.mark.asyncio
async def test_qdrant_search_resource_returns_pydantic(self, client):
"""Test Qdrant search resource returns proper Pydantic models."""
try:
test_query = "test resource search"
content = await client.read_resource(f"qdrant://search/{test_query}")
assert content is not None, "Should return search results"
# Should be a Pydantic model instance, not a tuple or dict
from middleware.qdrant_types import (
QdrantErrorResponse,
QdrantSearchResponse,
)
assert isinstance(
content, (QdrantSearchResponse, QdrantErrorResponse)
), f"Should return QdrantSearchResponse or QdrantErrorResponse, got {type(content)}"
# If it's a search response, should have required fields
if isinstance(content, QdrantSearchResponse):
assert hasattr(content, "query"), "Should have query field"
assert hasattr(content, "results"), "Should have results field"
assert hasattr(
content, "total_results"
), "Should have total_results field"
# If it's an error response, should have error field
if isinstance(content, QdrantErrorResponse):
assert hasattr(content, "error"), "Should have error field"
assert hasattr(content, "uri"), "Should have uri field"
except Exception as e:
# Should not be tuple/attribute errors anymore
error_msg = str(e).lower()
assert (
"tuple" not in error_msg and "attribute" not in error_msg
), f"Should not have tuple/attribute errors: {e}"
@pytest.mark.asyncio
@pytest.mark.skip(
reason="Internal caching mechanism test - middleware behavior may vary"
)
async def test_context_caching_mechanism(self, client):
"""Test that middleware properly caches results for resource handlers to access."""
from unittest.mock import MagicMock
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
# Create middleware instance
middleware = QdrantUnifiedMiddleware()
# Mock context with state management
mock_context = MagicMock()
mock_context.message = MagicMock()
mock_context.message.uri = "qdrant://collections/list"
mock_context.set_state = MagicMock()
mock_context.get_state = MagicMock(return_value=None)
# Mock resource handler result
from middleware.qdrant_types import QdrantCollectionsListResponse
mock_result = QdrantCollectionsListResponse(
qdrant_enabled=True,
qdrant_url="test://url",
total_collections=2,
collections=[],
config={},
timestamp="2023-01-01T00:00:00Z",
)
# Mock resource handler
middleware.resource_handler.handle_qdrant_resource = AsyncMock(
return_value=mock_result
)
# Call on_read_resource
result = await middleware.on_read_resource(mock_context, AsyncMock())
# Should have cached the result
mock_context.set_state.assert_called_once()
cache_key, cached_value = mock_context.set_state.call_args[0]
assert cache_key == "qdrant_resource_qdrant://collections/list"
assert cached_value == mock_result
# Should return the result
assert result == mock_result
assert isinstance(result, QdrantCollectionsListResponse)
@pytest.mark.service("qdrant")
class TestQdrantCoreManagerIntegration:
"""Test integration between qdrant_core managers in the refactored middleware."""
@pytest.mark.asyncio
@pytest.mark.skip(
reason="Internal implementation test - config object identity may differ without affecting functionality"
)
async def test_managers_share_same_config(self, client):
"""Test that all managers use the same configuration."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware(
collection_name="test_shared_config", embedding_model="test-shared-model"
)
# All managers should reference the same config
assert middleware.client_manager.config == middleware.config
assert middleware.storage_manager.config == middleware.config
assert middleware.search_manager.config == middleware.config
assert middleware.resource_handler.config == middleware.config
# Config values should propagate
assert middleware.client_manager.config.collection_name == "test_shared_config"
assert middleware.storage_manager.config.embedding_model == "test-shared-model"
@pytest.mark.asyncio
async def test_managers_reference_same_client_manager(self, client):
"""Test that storage/search/resource managers reference the same client manager."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# All managers should reference the same client manager
assert middleware.storage_manager.client_manager is middleware.client_manager
assert middleware.search_manager.client_manager is middleware.client_manager
assert middleware.resource_handler.client_manager is middleware.client_manager
@pytest.mark.asyncio
async def test_middleware_delegation_methods(self, client):
"""Test that middleware properly delegates to manager methods."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# Test delegation methods exist
assert hasattr(middleware, "search")
assert hasattr(middleware, "search_responses")
assert hasattr(middleware, "get_analytics")
assert hasattr(middleware, "get_response_by_id")
# Methods should be callable
assert callable(middleware.search)
assert callable(middleware.search_responses)
assert callable(middleware.get_analytics)
assert callable(middleware.get_response_by_id)
@pytest.mark.service("qdrant")
class TestQdrantBackwardCompatibility:
"""Test that refactored middleware maintains complete backward compatibility."""
@pytest.mark.asyncio
async def test_middleware_aliases_work(self, client):
"""Test that all middleware aliases import correctly."""
# Test imports
from middleware.qdrant_middleware import (
EnhancedQdrantResponseMiddleware,
QdrantResponseMiddleware,
QdrantUnifiedMiddleware,
setup_enhanced_qdrant_tools,
)
# Aliases should point to the same class
assert QdrantResponseMiddleware == QdrantUnifiedMiddleware
assert EnhancedQdrantResponseMiddleware == QdrantUnifiedMiddleware
# Function should be available
assert callable(setup_enhanced_qdrant_tools)
@pytest.mark.asyncio
async def test_legacy_middleware_interface(self, client):
"""Test that legacy middleware interface methods still work."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# Legacy interface methods should exist
assert hasattr(middleware, "_store_response")
assert hasattr(middleware, "_store_response_with_params")
assert hasattr(middleware, "initialize")
# Should be callable
assert callable(middleware._store_response)
assert callable(middleware._store_response_with_params)
assert callable(middleware.initialize)
# Properties should work
assert isinstance(middleware.is_initialized, bool)
@pytest.mark.service("qdrant")
class TestQdrantRefactoredPerformance:
"""Test that refactored middleware maintains performance characteristics."""
@pytest.mark.asyncio
@pytest.mark.slow
@pytest.mark.skip(
reason="Internal deferred initialization test - embedder may be eagerly initialized in some configs"
)
async def test_middleware_startup_performance(self, client):
"""Test that refactored middleware starts up quickly (deferred initialization)."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
# Middleware creation should be very fast (deferred init)
start_time = time.time()
middleware = QdrantUnifiedMiddleware()
creation_time = time.time() - start_time
assert (
creation_time < 0.1
), f"Middleware creation took {creation_time:.3f}s, should be < 0.1s"
# Should not be initialized yet
assert not middleware._initialized
# Client/embedder should be None initially
assert middleware.client is None
assert middleware.embedder is None
@pytest.mark.asyncio
async def test_tool_response_time(self, client):
"""Test that tool responses are reasonably fast with refactored middleware."""
test_queries = ["overview", "test search"]
for query in test_queries:
start_time = time.time()
result = await client.call_tool("search", {"query": query})
response_time = time.time() - start_time
assert result is not None
assert (
response_time < 10.0
), f"Search for '{query}' took {response_time:.2f}s, should be < 10s"
@pytest.mark.service("qdrant")
class TestQdrantDataSanitization:
"""Test that data sanitization prevents UTF-8 serialization errors."""
@pytest.mark.asyncio
async def test_sanitize_for_json_function(self, client):
"""Test the sanitize_for_json function handles various data types."""
from middleware.qdrant_core.storage import sanitize_for_json
# Test normal string
assert sanitize_for_json("normal text") == "normal text"
# Test basic types
assert sanitize_for_json(123) == 123
assert sanitize_for_json(True) == True
assert sanitize_for_json(None) is None
# Test binary data
binary_data = b"\x80\x81\x82" # Invalid UTF-8
result = sanitize_for_json(binary_data)
assert isinstance(result, str)
# Should be base64 encoded with prefix
expected = f"base64:{base64.b64encode(binary_data).decode('utf-8')}"
assert result == expected
# Test valid UTF-8 bytes
utf8_bytes = "hello".encode("utf-8")
assert sanitize_for_json(utf8_bytes) == "hello"
# Test nested structures
nested = {
"text": "normal",
"binary": b"\xff\xfe",
"list": [b"\x80", "text", 123],
"nested_dict": {"key": b"\x90"},
}
result = sanitize_for_json(nested)
assert isinstance(result, dict)
assert result["text"] == "normal"
assert isinstance(result["binary"], str)
assert isinstance(result["list"], list)
assert len(result["list"]) == 3
assert isinstance(result["nested_dict"], dict)
@pytest.mark.asyncio
async def test_middleware_handles_binary_data(self, client):
"""Test that middleware can handle responses with binary data."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# Create mock response with binary data
binary_response = {
"text": "normal text",
"binary_field": b"\x80\x81\x82\x83", # Invalid UTF-8
"nested": {"more_binary": b"\xff\xfe\xfd"},
"list_with_binary": [b"\x90", "text", b"\x91"],
}
try:
# This should not raise a UTF-8 serialization error
await middleware._store_response_with_params(
tool_name="test_binary",
tool_args={"test": "args"},
response=binary_response,
execution_time_ms=100,
session_id="test_session",
user_email="test@example.com",
)
# If we get here without exception, sanitization worked
assert True, "Binary data was handled without UTF-8 errors"
except Exception as e:
# Should not be a UTF-8 error
assert (
"utf-8" not in str(e).lower()
), f"UTF-8 error still occurring: {e}"
# Other errors are acceptable (like Qdrant connection issues)
@pytest.mark.asyncio
async def test_gmail_labels_data_handling(self, client):
"""Test handling of Gmail labels data that was causing UTF-8 errors."""
from middleware.qdrant_core.storage import sanitize_for_json
# Simulate Gmail labels response that might contain binary data
gmail_labels_response = {
"labels": [
{"name": "INBOX", "id": "INBOX"},
{"name": "SENT", "id": "SENT"},
# Simulate problematic data
{"name": b"\x80invalid_utf8", "id": "PROBLEM_LABEL"},
{"name": "Normal Label", "metadata": b"\xff\xfe\xfd"},
],
"status": "success",
"binary_attachment": b"\x80\x81\x82\x83\x84\x85",
}
# Should handle this without errors
sanitized = sanitize_for_json(gmail_labels_response)
assert isinstance(sanitized, dict)
assert "labels" in sanitized
assert len(sanitized["labels"]) == 4
# Problem label should be base64 encoded
problem_label = sanitized["labels"][2]
assert isinstance(problem_label["name"], str)
# Should be base64 encoded binary data
# Binary attachment should be base64 encoded
assert isinstance(sanitized["binary_attachment"], str)
@pytest.mark.service("qdrant")
class TestQdrantResourceRegistration:
"""Test that Qdrant resources are properly registered with FastMCP."""
@pytest.mark.asyncio
async def test_setup_qdrant_resources_function(self, client):
"""Test that setup_qdrant_resources function is available."""
from middleware.qdrant_middleware import setup_qdrant_resources
assert callable(
setup_qdrant_resources
), "setup_qdrant_resources should be callable"
# Mock FastMCP instance
mock_mcp = MagicMock()
mock_mcp.resource = MagicMock(return_value=lambda func: func)
# Should not raise errors
setup_qdrant_resources(mock_mcp)
# Should have registered resources
assert (
mock_mcp.resource.call_count >= 6
), "Should register at least 6 qdrant:// resources"
@pytest.mark.asyncio
async def test_qdrant_status_resource(self, client):
"""Test the qdrant://status resource."""
try:
content = await client.read_resource("qdrant://status")
assert content is not None, "Should return status info"
# Just verify we got some response - the format can vary
assert len(str(content)) > 0, "Should return non-empty response"
except Exception as e:
# Acceptable if Qdrant is not available
error_msg = str(e).lower()
assert any(
word in error_msg
for word in ["not available", "connection", "tuple", "attribute"]
), "Should be a connection or format error"
@pytest.mark.asyncio
async def test_qdrant_collections_list_resource(self, client):
"""Test the qdrant://collections/list resource."""
try:
content = await client.read_resource("qdrant://collections/list")
assert content is not None, "Should return collections list"
# Just verify we got some response - the format can vary
assert len(str(content)) > 0, "Should return non-empty response"
except Exception as e:
# Acceptable if Qdrant is not available
error_msg = str(e).lower()
assert any(
word in error_msg
for word in ["not available", "connection", "tuple", "attribute"]
), "Should be a connection or format error"
@pytest.mark.service("qdrant")
class TestQdrantActualDataOperations:
"""Test actual data storage and retrieval operations (not mocked)."""
@pytest.mark.asyncio
async def test_actual_middleware_storage_integration(self, client):
"""Test that middleware actually stores data when Qdrant is available."""
from unittest.mock import MagicMock
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# Create mock context
mock_context = MagicMock()
mock_context.message = MagicMock()
mock_context.message.name = "test_actual_storage"
mock_context.message.arguments = {"test_param": "test_value"}
# Mock call_next to return a response
async def mock_call_next(context):
return {"result": "test response data", "status": "success"}
try:
# This should attempt actual storage
result = await middleware.on_call_tool(mock_context, mock_call_next)
# Should return the response
assert result is not None
assert result["result"] == "test response data"
assert result["status"] == "success"
# Storage should have been attempted (even if it fails due to no Qdrant)
assert True, "Middleware handled storage attempt without crashing"
except Exception as e:
# Should not be a UTF-8 serialization error
assert (
"utf-8" not in str(e).lower()
), f"UTF-8 error in actual storage: {e}"
# Connection errors are acceptable
@pytest.mark.asyncio
async def test_actual_search_manager_integration(self, client):
"""Test actual integration with search manager."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
try:
# Try actual search (may fail if Qdrant not available, but shouldn't crash)
results = await middleware.search("test query", limit=5)
# If successful, should return list
assert isinstance(results, list), "Search should return a list"
except Exception as e:
# Should not be UTF-8 errors, connection errors are acceptable
assert "utf-8" not in str(e).lower(), f"UTF-8 error in search: {e}"
@pytest.mark.asyncio
async def test_actual_analytics_integration(self, client):
"""Test actual integration with analytics."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
try:
# Try actual analytics (may fail if Qdrant not available)
analytics = await middleware.get_analytics()
# If successful, should return dict
assert isinstance(analytics, dict), "Analytics should return a dict"
except Exception as e:
# Should not be UTF-8 errors, connection errors are acceptable
assert "utf-8" not in str(e).lower(), f"UTF-8 error in analytics: {e}"
@pytest.mark.service("qdrant")
class TestQdrantErrorHandling:
"""Test error handling and recovery in refactored middleware."""
@pytest.mark.asyncio
async def test_middleware_handles_connection_errors(self, client):
"""Test that middleware gracefully handles Qdrant connection errors."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
# Create middleware with invalid connection
middleware = QdrantUnifiedMiddleware(
qdrant_host="invalid_host",
qdrant_port=9999, # Use valid port range
auto_discovery=False,
)
# Should not crash on initialization
assert middleware is not None
assert not middleware.is_initialized
# Should handle storage gracefully
try:
await middleware._store_response_with_params(
tool_name="test_error_handling",
tool_args={},
response={"test": "data"},
execution_time_ms=0,
session_id="test",
user_email="test@example.com",
)
# Should complete without crashing
assert True, "Handled connection error gracefully"
except Exception as e:
# Should not be UTF-8 errors
assert "utf-8" not in str(e).lower()
@pytest.mark.asyncio
async def test_middleware_handles_invalid_responses(self, client):
"""Test that middleware handles various invalid response formats."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# Test various invalid/problematic responses
test_responses = [
None,
"",
{"circular_ref": None}, # Will be handled by sanitization
b"\x80\x81\x82", # Binary data
[1, 2, {"binary": b"\xff\xfe"}], # Mixed content
{"huge_text": "x" * 100000}, # Large content
]
for i, response in enumerate(test_responses):
try:
await middleware._store_response_with_params(
tool_name=f"test_invalid_response_{i}",
tool_args={},
response=response,
execution_time_ms=0,
session_id="test",
user_email="test@example.com",
)
# Should handle without UTF-8 errors
assert True, f"Handled invalid response {i} without UTF-8 errors"
except Exception as e:
# Should not be UTF-8 serialization errors
assert (
"utf-8" not in str(e).lower()
), f"UTF-8 error with response {i}: {e}"
@pytest.mark.service("qdrant")
class TestQdrantToolIntegration:
"""Test that FETCH tool works correctly with refactored middleware."""
@pytest.mark.asyncio
async def test_fetch_tool_with_actual_middleware(self, client):
"""Test FETCH tool integration with actual middleware."""
# Test with various point IDs
test_ids = [
str(uuid.uuid4()), # Random UUID
"non-existent-id", # String ID
"123", # Numeric string ID
]
for test_id in test_ids:
try:
result = await client.call_tool("fetch", {"point_id": test_id})
assert (
result is not None
), f"FETCH should return result for ID: {test_id}"
content = (
result.content[0].text
if hasattr(result, "content")
else str(result)
)
# Should be valid JSON or error message
try:
data = json.loads(content)
# Should follow MCP FETCH format
assert "id" in data, "FETCH response must have 'id' field"
assert "title" in data, "FETCH response must have 'title' field"
assert "text" in data, "FETCH response must have 'text' field"
assert "url" in data, "FETCH response must have 'url' field"
except json.JSONDecodeError:
# Should be error message
assert (
"error" in content.lower()
), f"Invalid response for ID {test_id}: {content}"
except Exception as e:
# Should not be UTF-8 errors
assert (
"utf-8" not in str(e).lower()
), f"UTF-8 error with FETCH for ID {test_id}: {e}"
@pytest.mark.asyncio
async def test_search_tool_with_actual_middleware(self, client):
"""Test SEARCH tool integration with actual middleware."""
# Test various search queries
test_queries = [
"overview",
"analytics",
"recent activity",
"service:gmail",
"user:" + TEST_EMAIL,
"semantic search test query",
]
for query in test_queries:
try:
result = await client.call_tool("search", {"query": query})
assert (
result is not None
), f"SEARCH should return result for query: {query}"
content = (
result.content[0].text
if hasattr(result, "content")
else str(result)
)
# Should be valid JSON or error message
try:
data = json.loads(content)
assert (
"results" in data
), f"SEARCH response must have 'results' field for query: {query}"
assert isinstance(
data["results"], list
), "Results should be a list"
except json.JSONDecodeError:
# Should be error message
assert (
"error" in content.lower() or "qdrant" in content.lower()
), f"Invalid response for query '{query}': {content}"
except Exception as e:
# Should not be UTF-8 errors
assert (
"utf-8" not in str(e).lower()
), f"UTF-8 error with SEARCH for query '{query}': {e}"
@pytest.mark.service("qdrant")
class TestQdrantPerformanceAndReliability:
"""Test performance characteristics and reliability of refactored middleware."""
@pytest.mark.asyncio
async def test_concurrent_storage_operations(self, client):
"""Test that middleware handles concurrent storage operations."""
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# Create multiple concurrent storage operations
async def store_test_data(i):
try:
await middleware._store_response_with_params(
tool_name=f"concurrent_test_{i}",
tool_args={"iteration": i},
response={"data": f"test_data_{i}", "binary": b"\x80\x81" * i},
execution_time_ms=i * 10,
session_id=f"session_{i}",
user_email=f"test{i}@example.com",
)
return True
except Exception as e:
# Should not be UTF-8 errors
assert (
"utf-8" not in str(e).lower()
), f"UTF-8 error in concurrent test {i}: {e}"
return False
# Run concurrent operations
tasks = [store_test_data(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Should handle concurrent operations without crashing
assert len(results) == 5, "All concurrent operations should complete"
for i, result in enumerate(results):
if isinstance(result, Exception):
# Should not be UTF-8 errors
assert (
"utf-8" not in str(result).lower()
), f"UTF-8 error in result {i}: {result}"
@pytest.mark.asyncio
async def test_memory_usage_stability(self, client):
"""Test that middleware doesn't leak memory during operations."""
import gc
from middleware.qdrant_middleware import QdrantUnifiedMiddleware
middleware = QdrantUnifiedMiddleware()
# Perform multiple storage operations
for i in range(10):
try:
await middleware._store_response_with_params(
tool_name=f"memory_test_{i}",
tool_args={"large_data": "x" * 1000}, # Large data
response={"binary": b"\x80" * 1000}, # Large binary
execution_time_ms=i,
session_id=f"memory_session_{i}",
user_email="memory_test@example.com",
)
except Exception as e:
# Should not be UTF-8 errors
assert (
"utf-8" not in str(e).lower()
), f"UTF-8 error in memory test {i}: {e}"
# Force garbage collection
gc.collect()
# Should complete without memory issues
assert True, "Memory usage test completed successfully"