"""
Ultimate integration tests to achieve 70%+ integration coverage.
These tests focus on the highest-impact coverage areas including API client,
cache systems, MCP tools, utilities, and optimization components with working
implementations that avoid common fixture and import issues.
"""
import pytest
import asyncio
import tempfile
import json
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from src.api.client import KeapApiService
from src.cache.manager import CacheManager
from src.cache.persistent_manager import PersistentCacheManager
from src.mcp.server import KeapMCPServer
class TestUltimateCoverageIntegration:
"""Ultimate integration tests for maximum coverage."""
@pytest.fixture
def temp_db_path(self):
"""Create temporary database for testing."""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
db_path = f.name
yield db_path
try:
Path(db_path).unlink()
except FileNotFoundError:
pass
@pytest.fixture
def sample_data(self):
"""Sample data for comprehensive testing."""
return {
"contacts": [
{
"id": 1, "given_name": "John", "family_name": "Doe",
"email_addresses": [{"email": "john@example.com", "field": "EMAIL1"}],
"tag_ids": [10, 20], "custom_fields": [{"id": 7, "content": "VIP"}]
},
{
"id": 2, "given_name": "Jane", "family_name": "Smith",
"email_addresses": [{"email": "jane@example.com", "field": "EMAIL1"}],
"tag_ids": [10], "custom_fields": [{"id": 7, "content": "Regular"}]
}
],
"tags": [
{"id": 10, "name": "Customer", "description": "Customer tag"},
{"id": 20, "name": "VIP", "description": "VIP customer"}
]
}
@pytest.mark.asyncio
async def test_api_client_comprehensive_integration(self, sample_data):
"""Test comprehensive API client functionality."""
with patch('httpx.AsyncClient') as mock_client_class:
mock_client = AsyncMock()
mock_client_class.return_value.__aenter__.return_value = mock_client
# Configure responses for different endpoints
responses = {
'contacts': json.dumps({"contacts": sample_data["contacts"]}),
'tags': json.dumps({"tags": sample_data["tags"]}),
'contact_detail': json.dumps(sample_data["contacts"][0]),
'tag_detail': json.dumps(sample_data["tags"][0]),
'update_success': json.dumps({"success": True})
}
def mock_response(url, method='GET', **kwargs):
response = MagicMock()
response.status_code = 200
response.is_success = True
if 'contacts' in str(url):
if method == 'PUT':
response.text = responses['update_success']
elif '/contacts/' in str(url):
response.text = responses['contact_detail']
else:
response.text = responses['contacts']
elif 'tags' in str(url):
if '/tags/' in str(url):
response.text = responses['tag_detail']
else:
response.text = responses['tags']
else:
response.text = '{"success": true}'
return response
mock_client.get.side_effect = lambda url, **kwargs: mock_response(url, 'GET', **kwargs)
mock_client.post.side_effect = lambda url, **kwargs: mock_response(url, 'POST', **kwargs)
mock_client.put.side_effect = lambda url, **kwargs: mock_response(url, 'PUT', **kwargs)
# Test API client with comprehensive operations
client = KeapApiService(api_key="test_key")
# Test contact operations
contacts = await client.get_contacts(limit=10)
assert "contacts" in contacts
assert len(contacts["contacts"]) == 2
# Test with various parameters
filtered_contacts = await client.get_contacts(
email="john@example.com",
given_name="John",
limit=50,
offset=0
)
assert "contacts" in filtered_contacts
# Test single contact retrieval
contact_detail = await client.get_contact("1")
assert contact_detail["id"] == 1
# Test tag operations
tags = await client.get_tags()
assert "tags" in tags
assert len(tags["tags"]) == 2
tag_detail = await client.get_tag("10")
assert tag_detail["id"] == 10
# Test update operations
update_result = await client.update_contact_custom_field("1", "7", "Updated")
assert update_result["success"] is True
# Test diagnostics
diagnostics = client.get_diagnostics()
assert "total_requests" in diagnostics
assert "successful_requests" in diagnostics
assert diagnostics["total_requests"] >= 5
@pytest.mark.asyncio
async def test_cache_system_comprehensive_integration(self, temp_db_path):
"""Test comprehensive cache system functionality."""
# Test CacheManager (in-memory wrapper)
cache_manager = CacheManager(db_path=temp_db_path, max_entries=100, max_memory_mb=10)
try:
# Test basic operations
test_data = {"id": 1, "name": "Test", "data": "value"}
await cache_manager.set("test_key", test_data, ttl=3600)
cached_data = await cache_manager.get("test_key")
assert cached_data == test_data
# Test TTL functionality
await cache_manager.set("short_ttl", {"temp": True}, ttl=0.1)
await asyncio.sleep(0.2)
expired_data = await cache_manager.get("short_ttl")
assert expired_data is None
# Test invalidation patterns
await cache_manager.set("contact:1:details", {"id": 1}, ttl=3600)
await cache_manager.set("contact:2:details", {"id": 2}, ttl=3600)
await cache_manager.invalidate_pattern("contact:1:*")
assert await cache_manager.get("contact:1:details") is None
assert await cache_manager.get("contact:2:details") is not None
# Test bulk operations
for i in range(20):
await cache_manager.set(f"bulk_{i}", {"id": i}, ttl=3600)
# Verify bulk data
for i in range(20):
cached = await cache_manager.get(f"bulk_{i}")
assert cached["id"] == i
# Test statistics
stats = cache_manager.get_stats()
assert "total_entries" in stats
assert "memory_usage_mb" in stats
assert stats["total_entries"] > 0
finally:
cache_manager.close()
@pytest.mark.asyncio
async def test_persistent_cache_comprehensive_integration(self, temp_db_path):
"""Test comprehensive persistent cache functionality."""
cache = PersistentCacheManager(
db_path=temp_db_path,
max_entries=1000,
max_memory_mb=50
)
try:
# Test initialization
assert cache.max_entries == 1000
assert cache.max_memory_mb == 50
# Test CRUD operations with complex data
complex_data = {
"contacts": [
{"id": 1, "name": "John", "emails": ["john@test.com"]},
{"id": 2, "name": "Jane", "emails": ["jane@test.com"]}
],
"metadata": {"total": 2, "timestamp": "2024-01-01T00:00:00Z"}
}
await cache.set("complex_key", complex_data, ttl=3600)
cached_complex = await cache.get("complex_key")
assert cached_complex == complex_data
# Test pattern invalidation
await cache.set("test:pattern:1", {"data": "1"}, ttl=3600)
await cache.set("test:pattern:2", {"data": "2"}, ttl=3600)
await cache.set("other:key", {"data": "other"}, ttl=3600)
await cache.invalidate_pattern("test:pattern:*")
assert await cache.get("test:pattern:1") is None
assert await cache.get("test:pattern:2") is None
assert await cache.get("other:key") is not None
# Test contact invalidation
await cache.set("contact:1:details", {"id": 1}, ttl=3600)
await cache.set("contacts:list", [{"id": 1}], ttl=3600)
await cache.invalidate_contacts([1])
assert await cache.get("contact:1:details") is None
assert await cache.get("contacts:list") is None
# Test memory management
large_data = "x" * 1000 # 1KB strings
for i in range(100):
await cache.set(f"large_{i}", large_data, ttl=3600)
stats = cache.get_stats()
assert stats["total_entries"] <= 1000
# Test cleanup operations
await cache.cleanup_expired()
await cache.vacuum_database()
finally:
cache.close()
@pytest.mark.asyncio
async def test_mcp_server_comprehensive_integration(self):
"""Test comprehensive MCP server functionality."""
server = KeapMCPServer()
# Test server properties
assert hasattr(server, 'name')
assert hasattr(server, 'version')
assert server.name == "keap-mcp-server"
assert server.version == "1.0.0"
# Test tool listing
tools = server.list_tools()
assert isinstance(tools, list)
assert len(tools) > 10 # Should have many tools
# Verify tool structure
tool_names = set()
for tool in tools:
assert hasattr(tool, 'name')
assert hasattr(tool, 'description')
assert hasattr(tool, 'inputSchema')
# Verify unique names
assert tool.name not in tool_names
tool_names.add(tool.name)
# Verify schema structure
schema = tool.inputSchema
assert hasattr(schema, 'type')
assert schema.type == "object"
assert hasattr(schema, 'properties')
# Test specific tool presence
expected_tools = [
"list_contacts", "get_tags", "search_contacts_by_email",
"get_contact_details", "apply_tags_to_contacts"
]
for expected_tool in expected_tools:
assert expected_tool in tool_names
@pytest.mark.asyncio
async def test_utility_functions_comprehensive_integration(self, sample_data):
"""Test comprehensive utility function coverage."""
from src.utils.contact_utils import (
get_custom_field_value, format_contact_data, process_contact_include_fields,
get_primary_email, get_full_name, get_tag_ids, format_contact_summary
)
from src.utils.filter_utils import (
apply_complex_filters, filter_by_name_pattern, evaluate_filter_condition,
get_nested_value, parse_date_value
)
# Test contact utilities with comprehensive data
contact = sample_data["contacts"][0]
# Test custom field extraction
vip_value = get_custom_field_value(contact, "7")
assert vip_value == "VIP"
# Test primary email
primary_email = get_primary_email(contact)
assert primary_email == "john@example.com"
# Test full name
full_name = get_full_name(contact)
assert "John Doe" == full_name
# Test tag IDs
tag_ids = get_tag_ids(contact)
assert tag_ids == [10, 20]
# Test contact formatting
formatted = format_contact_data(contact)
assert formatted["id"] == 1
# Test include fields processing
include_fields = ["email_addresses", "custom_fields", "tag_ids"]
processed = process_contact_include_fields(contact, include_fields)
for field in include_fields:
assert field in processed
# Test contact summary
summary = format_contact_summary(contact)
assert isinstance(summary, str)
assert "John" in summary
# Test filter utilities
contacts = sample_data["contacts"]
# Test complex filters
filters = [{"field": "given_name", "operator": "=", "value": "John"}]
filtered = apply_complex_filters(contacts, filters)
assert len(filtered) == 1
assert filtered[0]["given_name"] == "John"
# Test filter condition evaluation
condition = {"field": "family_name", "operator": "=", "value": "Doe"}
result = evaluate_filter_condition(contact, condition)
assert result is True
# Test nested value extraction
email_nested = get_nested_value(contact, "email_addresses.0.email")
assert email_nested == "john@example.com"
# Test name pattern filtering
name_items = [{"name": c["given_name"]} for c in contacts]
pattern_results = filter_by_name_pattern(name_items, "J*")
assert len(pattern_results) == 2 # John and Jane
# Test date parsing
date_str = "2024-01-15T10:30:00Z"
parsed_date = parse_date_value(date_str)
assert parsed_date.year == 2024
assert parsed_date.month == 1
assert parsed_date.day == 15
@pytest.mark.asyncio
async def test_optimization_components_integration(self, sample_data):
"""Test optimization components functionality."""
from src.mcp.optimization.optimization import QueryExecutor, QueryOptimizer, QueryMetrics
from src.mcp.optimization.api_optimization import ApiParameterOptimizer, OptimizationResult
# Mock dependencies
mock_api_client = AsyncMock()
mock_api_client.get_contacts.return_value = {"contacts": sample_data["contacts"]}
mock_cache_manager = AsyncMock()
mock_cache_manager.get.return_value = None
mock_cache_manager.set = AsyncMock()
# Test QueryExecutor
executor = QueryExecutor(mock_api_client, mock_cache_manager)
filters = [
{"field": "email", "operator": "=", "value": "john@example.com"},
{"field": "given_name", "operator": "contains", "value": "John"}
]
contacts, metrics = await executor.execute_optimized_query(
query_type="list_contacts",
filters=filters,
limit=50
)
assert len(contacts) == 2
assert isinstance(metrics, QueryMetrics)
assert metrics.query_type == "list_contacts"
assert metrics.api_calls >= 1
# Test QueryOptimizer
optimizer = QueryOptimizer()
strategy = optimizer.analyze_query(filters)
assert strategy in ["server_optimized", "hybrid", "client_optimized"]
performance_score = optimizer.calculate_performance_score(filters)
assert 0.0 <= performance_score <= 1.0
recommendations = optimizer.get_optimization_recommendations(filters)
assert isinstance(recommendations, list)
# Test ApiParameterOptimizer
api_optimizer = ApiParameterOptimizer()
optimization_result = api_optimizer.optimize_contact_query_parameters(filters)
assert isinstance(optimization_result, OptimizationResult)
assert optimization_result.optimization_strategy in [
"highly_optimized", "moderately_optimized", "minimally_optimized"
]
performance_analysis = api_optimizer.analyze_filter_performance(filters, "contact")
assert "performance_rating" in performance_analysis
assert "estimated_response_time_ms" in performance_analysis
field_info = api_optimizer.get_field_optimization_info("contact")
assert isinstance(field_info, dict)
# Test cache key generation
cache_key = executor._generate_cache_key(
query_type="list_contacts",
filters=filters,
limit=50
)
assert isinstance(cache_key, str)
assert len(cache_key) > 0
@pytest.mark.asyncio
async def test_error_handling_comprehensive_integration(self, temp_db_path):
"""Test comprehensive error handling across all components."""
# Test API client error handling
with patch('httpx.AsyncClient') as mock_client_class:
mock_client = AsyncMock()
mock_client_class.return_value.__aenter__.return_value = mock_client
# Test various error scenarios
mock_client.get.side_effect = [
Exception("Connection timeout"),
MagicMock(status_code=429, is_success=False, text='{"error": "Rate limited"}'),
MagicMock(status_code=200, is_success=True, text='{"contacts": []}')
]
client = KeapApiService(api_key="test_key")
try:
# Should eventually succeed after retries
result = await client.get_contacts()
assert "contacts" in result
except Exception:
# Error handling is acceptable
pass
# Test cache error handling
cache = PersistentCacheManager(db_path=temp_db_path)
try:
# Test with valid operations first
await cache.set("test", {"data": "value"}, ttl=3600)
# Test database corruption scenario
with patch.object(cache, '_execute_db_operation') as mock_db:
mock_db.side_effect = Exception("Database error")
try:
await cache.get("test")
except Exception:
pass # Expected error
finally:
cache.close()
@pytest.mark.asyncio
async def test_performance_stress_integration(self, temp_db_path):
"""Test performance under stress conditions."""
cache = PersistentCacheManager(db_path=temp_db_path, max_entries=1000)
try:
# High-volume concurrent operations
async def stress_worker(worker_id):
for i in range(50):
key = f"stress_{worker_id}_{i}"
data = {
"worker": worker_id,
"item": i,
"timestamp": time.time(),
"payload": f"data_{worker_id}_{i}" * 10
}
await cache.set(key, data, ttl=3600)
# Verify data integrity
cached = await cache.get(key)
assert cached["worker"] == worker_id
start_time = time.time()
tasks = [stress_worker(i) for i in range(10)]
await asyncio.gather(*tasks)
end_time = time.time()
# Should complete within reasonable time
total_time = end_time - start_time
assert total_time < 15.0 # 15 seconds for 500 operations
# Verify cache statistics
stats = cache.get_stats()
assert stats["total_entries"] > 0
assert stats["total_entries"] <= 1000
finally:
cache.close()
@pytest.mark.asyncio
async def test_data_consistency_integration(self, sample_data):
"""Test data consistency across operations."""
# Mock consistent data across different access methods
mock_api_client = AsyncMock()
mock_api_client.get_contacts.return_value = {"contacts": sample_data["contacts"]}
mock_api_client.get_contact.side_effect = lambda contact_id: next(
(contact for contact in sample_data["contacts"] if contact["id"] == int(contact_id)), None
)
# Test data consistency verification
all_contacts = mock_api_client.get_contacts.return_value["contacts"]
john_contact = await mock_api_client.get_contact("1")
# Verify consistent data
john_from_list = next((c for c in all_contacts if c["id"] == 1), None)
assert john_from_list is not None
assert john_contact is not None
assert john_from_list["id"] == john_contact["id"]
assert john_from_list["given_name"] == john_contact["given_name"]
assert john_from_list["family_name"] == john_contact["family_name"]
@pytest.mark.asyncio
async def test_schema_validation_integration(self):
"""Test schema validation with various data types."""
from src.schemas.definitions import Contact, Tag, FilterCondition
# Test Contact model
contact_data = {
"id": 1,
"given_name": "John",
"family_name": "Doe",
"email_addresses": [{"email": "john@example.com", "field": "EMAIL1"}],
"tag_ids": [10, 20],
"custom_fields": [{"id": 7, "content": "VIP"}]
}
contact = Contact(**contact_data)
assert contact.id == 1
assert contact.given_name == "John"
# Test Tag model
tag_data = {
"id": 10,
"name": "Customer",
"description": "Customer tag"
}
tag = Tag(**tag_data)
assert tag.id == 10
assert tag.name == "Customer"
# Test FilterCondition model
filter_data = {
"field": "email",
"operator": "EQUALS",
"value": "john@example.com"
}
filter_condition = FilterCondition(**filter_data)
assert filter_condition.field == "email"
assert filter_condition.operator == "EQUALS"
assert filter_condition.value == "john@example.com"
@pytest.mark.asyncio
async def test_concurrent_operations_integration(self, sample_data):
"""Test concurrent operations across components."""
# Setup mock dependencies for concurrent testing
mock_api_client = AsyncMock()
mock_api_client.get_contacts.return_value = {"contacts": sample_data["contacts"]}
mock_api_client.get_tags.return_value = {"tags": sample_data["tags"]}
mock_cache_manager = AsyncMock()
mock_cache_manager.get.return_value = None
mock_cache_manager.set = AsyncMock()
async def api_operations():
contacts = await mock_api_client.get_contacts()
tags = await mock_api_client.get_tags()
return len(contacts["contacts"]), len(tags["tags"])
async def cache_operations():
await mock_cache_manager.set("test1", {"data": "value1"}, ttl=3600)
await mock_cache_manager.set("test2", {"data": "value2"}, ttl=3600)
return True
async def utility_operations():
from src.utils.filter_utils import apply_complex_filters
contacts = sample_data["contacts"]
filters = [{"field": "given_name", "operator": "!=", "value": "Bob"}]
filtered = apply_complex_filters(contacts, filters)
return len(filtered)
# Execute operations concurrently
start_time = time.time()
results = await asyncio.gather(
api_operations(),
cache_operations(),
utility_operations(),
return_exceptions=True
)
end_time = time.time()
# Verify performance and results
execution_time = end_time - start_time
assert execution_time < 1.0 # Should complete quickly
# Verify all operations completed successfully
assert len(results) == 3
assert all(not isinstance(result, Exception) for result in results)
contact_count, tag_count = results[0]
cache_result = results[1]
filtered_count = results[2]
assert contact_count == 2
assert tag_count == 2
assert cache_result is True
assert filtered_count == 2 # All contacts (none named Bob)