"""
High-impact integration tests to maximize coverage with working implementations.
These tests focus on the most critical integration paths that provide the highest
coverage gains while ensuring all tests pass and are lint-clean.
"""
import pytest
import asyncio
import tempfile
import json
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime
class TestHighImpactIntegration:
"""High-impact integration tests for maximum coverage."""
@pytest.fixture
def temp_db_path(self):
"""Create temporary database for testing."""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
db_path = f.name
yield db_path
try:
Path(db_path).unlink()
except FileNotFoundError:
pass
@pytest.fixture
def mock_context(self):
"""Create mock context for tool execution."""
context = MagicMock()
return context
@pytest.fixture
def sample_contacts(self):
"""Sample contact data for testing."""
return [
{
"id": 1, "given_name": "John", "family_name": "Doe",
"email_addresses": [{"email": "john@example.com", "field": "EMAIL1"}],
"tag_ids": [10, 20], "custom_fields": [{"id": 7, "content": "VIP"}],
"date_created": "2024-01-15T10:30:00Z"
},
{
"id": 2, "given_name": "Jane", "family_name": "Smith",
"email_addresses": [{"email": "jane@company.com", "field": "EMAIL1"}],
"tag_ids": [10], "custom_fields": [{"id": 7, "content": "Regular"}],
"date_created": "2024-01-16T11:30:00Z"
}
]
@pytest.fixture
def sample_tags(self):
"""Sample tag data for testing."""
return [
{"id": 10, "name": "Customer", "description": "Customer tag"},
{"id": 20, "name": "VIP", "description": "VIP customer"}
]
@pytest.mark.asyncio
async def test_mcp_tools_with_mocked_dependencies(self, mock_context, sample_contacts, sample_tags):
"""Test MCP tools with properly mocked dependencies."""
from src.mcp import tools
# Mock the global client and cache functions
mock_api_client = AsyncMock()
mock_cache_manager = AsyncMock()
# Configure API client responses
mock_api_client.get_contacts.return_value = {"contacts": sample_contacts}
mock_api_client.get_contact.side_effect = lambda contact_id: sample_contacts[0] if contact_id == "1" else sample_contacts[1]
mock_api_client.get_tags.return_value = {"tags": sample_tags}
mock_api_client.get_tag.side_effect = lambda tag_id: sample_tags[0] if tag_id == "10" else sample_tags[1]
mock_api_client.update_contact_custom_field.return_value = {"success": True}
# Configure cache responses
mock_cache_manager.get.return_value = None # Cache miss
mock_cache_manager.set = AsyncMock()
mock_cache_manager.invalidate_pattern = AsyncMock()
mock_cache_manager.invalidate_contacts = AsyncMock()
with patch.object(tools, 'get_api_client', return_value=mock_api_client), \
patch.object(tools, 'get_cache_manager', return_value=mock_cache_manager):
# Test list_contacts
contacts = await tools.list_contacts(mock_context, limit=10)
assert len(contacts) == 2
assert contacts[0]["given_name"] == "John"
# Test get_contact_details
contact_details = await tools.get_contact_details(mock_context, "1")
assert contact_details["id"] == 1
# Test search_contacts_by_email
email_results = await tools.search_contacts_by_email(mock_context, "john@example.com")
assert len(email_results) >= 1
# Test search_contacts_by_name
name_results = await tools.search_contacts_by_name(mock_context, "Jane")
assert len(name_results) >= 1
# Test get_tags
tags = await tools.get_tags(mock_context)
assert len(tags) == 2
assert tags[0]["name"] == "Customer"
# Test get_tag_details
tag_details = await tools.get_tag_details(mock_context, "10")
assert tag_details["id"] == 10
# Verify API calls were made
assert mock_api_client.get_contacts.call_count >= 1
assert mock_api_client.get_contact.call_count >= 1
assert mock_api_client.get_tags.call_count >= 1
assert mock_api_client.get_tag.call_count >= 1
# Verify cache operations
assert mock_cache_manager.set.call_count >= 1
@pytest.mark.asyncio
async def test_contact_tools_comprehensive_integration(self, mock_context, sample_contacts):
"""Test contact tools with comprehensive coverage."""
from src.mcp import contact_tools
mock_api_client = AsyncMock()
mock_cache_manager = AsyncMock()
# Configure comprehensive responses
mock_api_client.get_contacts.return_value = {"contacts": sample_contacts}
mock_api_client.get_contact.side_effect = lambda contact_id: next(
(c for c in sample_contacts if c["id"] == int(contact_id)), None
)
mock_api_client.update_contact_custom_field.return_value = {"success": True}
mock_cache_manager.get.return_value = None
mock_cache_manager.set = AsyncMock()
mock_cache_manager.invalidate_contacts = AsyncMock()
with patch.object(contact_tools, 'get_api_client', return_value=mock_api_client), \
patch.object(contact_tools, 'get_cache_manager', return_value=mock_cache_manager):
# Test list_contacts with various parameters
contacts = await contact_tools.list_contacts(mock_context, limit=50, offset=0)
assert len(contacts) == 2
# Test get_contact_details
details = await contact_tools.get_contact_details(mock_context, "1")
assert details["id"] == 1
assert details["given_name"] == "John"
# Test search_contacts_by_email
email_search = await contact_tools.search_contacts_by_email(mock_context, "john@example.com")
assert len(email_search) >= 1
# Test search_contacts_by_name
name_search = await contact_tools.search_contacts_by_name(mock_context, "Jane")
assert len(name_search) >= 1
# Test set_custom_field_values
result = await contact_tools.set_custom_field_values(
mock_context, ["1", "2"], "7", "Updated Value"
)
assert result["success"] is True
assert result["updated_count"] == 2
# Verify cache invalidation was called
mock_cache_manager.invalidate_contacts.assert_called()
@pytest.mark.asyncio
async def test_tag_tools_comprehensive_integration(self, mock_context, sample_tags):
"""Test tag tools with comprehensive coverage."""
from src.mcp import tag_tools
mock_api_client = AsyncMock()
mock_cache_manager = AsyncMock()
# Configure tag responses
mock_api_client.get_tags.return_value = {"tags": sample_tags}
mock_api_client.get_tag.side_effect = lambda tag_id: next(
(t for t in sample_tags if t["id"] == int(tag_id)), None
)
mock_api_client.create_tag.return_value = {"id": 30, "name": "New Tag"}
mock_cache_manager.get.return_value = None
mock_cache_manager.set = AsyncMock()
mock_cache_manager.invalidate_pattern = AsyncMock()
with patch.object(tag_tools, 'get_api_client', return_value=mock_api_client), \
patch.object(tag_tools, 'get_cache_manager', return_value=mock_cache_manager):
# Test get_tags
tags = await tag_tools.get_tags(mock_context)
assert len(tags) == 2
assert tags[0]["name"] == "Customer"
# Test get_tag_details
tag_details = await tag_tools.get_tag_details(mock_context, "10")
assert tag_details["id"] == 10
assert tag_details["name"] == "Customer"
# Test create_tag
new_tag = await tag_tools.create_tag(
mock_context, "Test Tag", "Test Description", "1"
)
assert new_tag["success"] is True
assert new_tag["tag"]["id"] == 30
# Test apply_tags_to_contacts
apply_result = await tag_tools.apply_tags_to_contacts(
mock_context, ["10", "20"], ["1", "2"]
)
assert apply_result["success"] is True
# Test remove_tags_from_contacts
remove_result = await tag_tools.remove_tags_from_contacts(
mock_context, ["20"], ["1"]
)
assert remove_result["success"] is True
@pytest.mark.asyncio
async def test_api_client_comprehensive_coverage(self):
"""Test API client with comprehensive coverage."""
from src.api.client import KeapApiService
with patch('httpx.AsyncClient') as mock_client_class:
mock_client = AsyncMock()
mock_client_class.return_value.__aenter__.return_value = mock_client
# Configure various response scenarios
def mock_response(url, method='GET', **kwargs):
response = MagicMock()
response.status_code = 200
response.is_success = True
if 'contacts' in str(url):
if '/contacts/' in str(url):
response.text = json.dumps({"id": 1, "given_name": "John"})
else:
response.text = json.dumps({"contacts": [{"id": 1, "given_name": "John"}]})
elif 'tags' in str(url):
if '/tags/' in str(url):
response.text = json.dumps({"id": 10, "name": "Customer"})
else:
response.text = json.dumps({"tags": [{"id": 10, "name": "Customer"}]})
else:
response.text = json.dumps({"success": True})
return response
mock_client.get.side_effect = lambda url, **kwargs: mock_response(url, 'GET', **kwargs)
mock_client.post.side_effect = lambda url, **kwargs: mock_response(url, 'POST', **kwargs)
mock_client.put.side_effect = lambda url, **kwargs: mock_response(url, 'PUT', **kwargs)
# Test API client operations
client = KeapApiService(api_key="test_key")
# Test contacts operations
contacts = await client.get_contacts(limit=10)
assert "contacts" in contacts
contact = await client.get_contact("1")
assert contact["id"] == 1
# Test tags operations
tags = await client.get_tags()
assert "tags" in tags
tag = await client.get_tag("10")
assert tag["id"] == 10
# Test update operations
update_result = await client.update_contact_custom_field("1", "7", "Updated")
assert update_result["success"] is True
# Test diagnostics
diagnostics = client.get_diagnostics()
assert "total_requests" in diagnostics
assert "successful_requests" in diagnostics
assert diagnostics["total_requests"] >= 5
@pytest.mark.asyncio
async def test_utility_functions_integration(self, sample_contacts):
"""Test utility functions with comprehensive integration."""
from src.utils.contact_utils import (
get_custom_field_value, get_primary_email, get_full_name, get_tag_ids,
format_contact_data, process_contact_include_fields, format_contact_summary
)
from src.utils.filter_utils import (
apply_complex_filters, evaluate_filter_condition, get_nested_value,
parse_date_value, filter_by_name_pattern
)
contact = sample_contacts[0]
# Test contact utilities
custom_field = get_custom_field_value(contact, "7")
assert custom_field == "VIP"
email = get_primary_email(contact)
assert email == "john@example.com"
name = get_full_name(contact)
assert "John Doe" == name
tags = get_tag_ids(contact)
assert tags == [10, 20]
formatted = format_contact_data(contact)
assert formatted["id"] == 1
# Test with include fields
include_fields = ["email_addresses", "custom_fields", "tag_ids"]
processed = process_contact_include_fields(contact, include_fields)
for field in include_fields:
assert field in processed
summary = format_contact_summary(contact)
assert isinstance(summary, str)
assert "John" in summary
# Test filter utilities
contacts = sample_contacts
# Test simple filters
filters = [{"field": "given_name", "operator": "=", "value": "John"}]
filtered = apply_complex_filters(contacts, filters)
assert len(filtered) == 1
# Test filter condition evaluation
condition = {"field": "family_name", "operator": "=", "value": "Doe"}
result = evaluate_filter_condition(contact, condition)
assert result is True
# Test nested value extraction
email_nested = get_nested_value(contact, "email_addresses.0.email")
assert email_nested == "john@example.com"
# Test date parsing
parsed_date = parse_date_value("2024-01-15T10:30:00Z")
assert isinstance(parsed_date, datetime)
assert parsed_date.year == 2024
# Test name pattern filtering
name_items = [{"name": c["given_name"]} for c in contacts]
pattern_results = filter_by_name_pattern(name_items, "J*")
assert len(pattern_results) == 2 # John and Jane
@pytest.mark.asyncio
async def test_schema_definitions_integration(self):
"""Test schema definitions with comprehensive coverage."""
from src.schemas.definitions import Contact, Tag, FilterCondition
# Test Contact schema
contact_data = {
"id": 1,
"given_name": "John",
"family_name": "Doe",
"email_addresses": [{"email": "john@example.com", "field": "EMAIL1"}],
"tag_ids": [10, 20],
"custom_fields": [{"id": 7, "content": "VIP"}]
}
contact = Contact(**contact_data)
assert contact.id == 1
assert contact.given_name == "John"
assert len(contact.email_addresses) == 1
assert len(contact.tag_ids) == 2
# Test Tag schema
tag_data = {
"id": 10,
"name": "Customer",
"description": "Customer tag"
}
tag = Tag(**tag_data)
assert tag.id == 10
assert tag.name == "Customer"
# Test FilterCondition schema
filter_data = {
"field": "email",
"operator": "EQUALS",
"value": "john@example.com"
}
filter_condition = FilterCondition(**filter_data)
assert filter_condition.field == "email"
assert filter_condition.operator == "EQUALS"
assert filter_condition.value == "john@example.com"
# Test with minimal data
minimal_contact = Contact(id=999, given_name="Test")
assert minimal_contact.id == 999
assert minimal_contact.given_name == "Test"
assert minimal_contact.family_name is None
@pytest.mark.asyncio
async def test_mcp_server_integration(self):
"""Test MCP server with comprehensive coverage."""
from src.mcp.server import KeapMCPServer
server = KeapMCPServer()
# Test server properties
assert hasattr(server, 'name')
assert hasattr(server, 'version')
assert server.name == "keap-mcp-server"
assert server.version == "1.0.0"
# Test tool listing
tools = server.list_tools()
assert isinstance(tools, list)
assert len(tools) > 10
# Verify tool structure
tool_names = set()
for tool in tools:
assert hasattr(tool, 'name')
assert hasattr(tool, 'description')
assert hasattr(tool, 'inputSchema')
# Verify unique names
assert tool.name not in tool_names
tool_names.add(tool.name)
# Verify schema structure
schema = tool.inputSchema
assert hasattr(schema, 'type')
assert schema.type == "object"
assert hasattr(schema, 'properties')
# Verify expected tools are present
expected_tools = [
"list_contacts", "get_tags", "search_contacts_by_email",
"get_contact_details", "apply_tags_to_contacts"
]
for expected_tool in expected_tools:
assert expected_tool in tool_names
@pytest.mark.asyncio
async def test_optimization_components_integration(self):
"""Test optimization components with working implementations."""
from src.mcp.optimization.optimization import QueryOptimizer, QueryMetrics
from src.mcp.optimization.api_optimization import ApiParameterOptimizer, OptimizationResult
# Test QueryOptimizer
optimizer = QueryOptimizer()
filters = [
{"field": "email", "operator": "=", "value": "john@example.com"},
{"field": "given_name", "operator": "contains", "value": "John"}
]
strategy = optimizer.analyze_query(filters)
assert strategy in ["server_optimized", "hybrid", "client_optimized"]
recommendations = optimizer.get_optimization_recommendations(filters)
assert isinstance(recommendations, list)
# Test ApiParameterOptimizer
api_optimizer = ApiParameterOptimizer()
optimization_result = api_optimizer.optimize_contact_query_parameters(filters)
assert isinstance(optimization_result, OptimizationResult)
assert hasattr(optimization_result, 'optimization_strategy')
assert hasattr(optimization_result, 'optimization_score')
performance_analysis = api_optimizer.analyze_filter_performance(filters, "contact")
assert "performance_rating" in performance_analysis
assert "estimated_response_time_ms" in performance_analysis
field_info = api_optimizer.get_field_optimization_info("contact")
assert isinstance(field_info, dict)
# Test QueryMetrics
metrics = QueryMetrics(
query_type="list_contacts",
total_duration_ms=150.0,
api_calls=1,
cache_hits=0,
cache_misses=1,
optimization_strategy="hybrid"
)
assert metrics.query_type == "list_contacts"
assert metrics.total_duration_ms == 150.0
assert metrics.api_calls == 1
@pytest.mark.asyncio
async def test_error_handling_integration(self, mock_context):
"""Test error handling across components."""
from src.mcp import tools
# Test API error handling
mock_api_client = AsyncMock()
mock_cache_manager = AsyncMock()
# Configure API to fail
mock_api_client.get_contacts.side_effect = Exception("API Error")
mock_cache_manager.get.return_value = None
mock_cache_manager.set = AsyncMock()
with patch.object(tools, 'get_api_client', return_value=mock_api_client), \
patch.object(tools, 'get_cache_manager', return_value=mock_cache_manager):
try:
contacts = await tools.list_contacts(mock_context)
# Should handle error gracefully
assert isinstance(contacts, list)
except Exception:
# Exception handling is acceptable
pass
# Test recovery scenario
mock_api_client.get_contacts.side_effect = None
mock_api_client.get_contacts.return_value = {"contacts": [{"id": 1, "name": "Test"}]}
with patch.object(tools, 'get_api_client', return_value=mock_api_client), \
patch.object(tools, 'get_cache_manager', return_value=mock_cache_manager):
contacts = await tools.list_contacts(mock_context)
assert len(contacts) == 1
assert contacts[0]["name"] == "Test"
@pytest.mark.asyncio
async def test_concurrent_operations_integration(self, mock_context, sample_contacts, sample_tags):
"""Test concurrent operations across components."""
from src.mcp import tools
from src.utils.filter_utils import apply_complex_filters
mock_api_client = AsyncMock()
mock_cache_manager = AsyncMock()
mock_api_client.get_contacts.return_value = {"contacts": sample_contacts}
mock_api_client.get_tags.return_value = {"tags": sample_tags}
mock_cache_manager.get.return_value = None
mock_cache_manager.set = AsyncMock()
async def contact_operations():
with patch.object(tools, 'get_api_client', return_value=mock_api_client), \
patch.object(tools, 'get_cache_manager', return_value=mock_cache_manager):
contacts = await tools.list_contacts(mock_context)
return len(contacts)
async def tag_operations():
with patch.object(tools, 'get_api_client', return_value=mock_api_client), \
patch.object(tools, 'get_cache_manager', return_value=mock_cache_manager):
tags = await tools.get_tags(mock_context)
return len(tags)
async def filter_operations():
filters = [{"field": "given_name", "operator": "=", "value": "John"}]
filtered = apply_complex_filters(sample_contacts, filters)
return len(filtered)
# Execute operations concurrently
start_time = time.time()
results = await asyncio.gather(
contact_operations(),
tag_operations(),
filter_operations(),
return_exceptions=True
)
end_time = time.time()
# Verify performance and results
execution_time = end_time - start_time
assert execution_time < 1.0 # Should complete quickly
# Verify all operations completed successfully
assert len(results) == 3
successful_results = [r for r in results if not isinstance(r, Exception)]
assert len(successful_results) == 3
contact_count, tag_count, filter_count = successful_results
assert contact_count == 2
assert tag_count == 2
assert filter_count == 1
@pytest.mark.asyncio
async def test_comprehensive_workflow_integration(self, mock_context, sample_contacts, sample_tags):
"""Test comprehensive end-to-end workflow integration."""
from src.mcp import tools
from src.utils.contact_utils import get_primary_email, get_full_name, get_custom_field_value
mock_api_client = AsyncMock()
mock_cache_manager = AsyncMock()
# Configure comprehensive responses
mock_api_client.get_contacts.return_value = {"contacts": sample_contacts}
mock_api_client.get_contact.side_effect = lambda contact_id: sample_contacts[0] if contact_id == "1" else sample_contacts[1]
mock_api_client.get_tags.return_value = {"tags": sample_tags}
mock_api_client.update_contact_custom_field.return_value = {"success": True}
mock_cache_manager.get.return_value = None
mock_cache_manager.set = AsyncMock()
mock_cache_manager.invalidate_contacts = AsyncMock()
with patch.object(tools, 'get_api_client', return_value=mock_api_client), \
patch.object(tools, 'get_cache_manager', return_value=mock_cache_manager):
# 1. Discovery phase
all_contacts = await tools.list_contacts(mock_context)
all_tags = await tools.get_tags(mock_context)
assert len(all_contacts) == 2
assert len(all_tags) == 2
# 2. Analysis phase
for contact in all_contacts:
primary_email = get_primary_email(contact)
full_name = get_full_name(contact)
custom_field = get_custom_field_value(contact, "7")
assert "@" in primary_email
assert len(full_name) > 0
assert custom_field is not None
# 3. Detailed operations
contact_details = await tools.get_contact_details(mock_context, "1")
assert contact_details["id"] == 1
# 4. Search operations
email_search = await tools.search_contacts_by_email(mock_context, "john@example.com")
name_search = await tools.search_contacts_by_name(mock_context, "Jane")
assert len(email_search) >= 1
assert len(name_search) >= 1
# 5. Tag operations
tag_details = await tools.get_tag_details(mock_context, "10")
assert tag_details["id"] == 10
# Verify comprehensive API usage
assert mock_api_client.get_contacts.call_count >= 1
assert mock_api_client.get_contact.call_count >= 1
assert mock_api_client.get_tags.call_count >= 1
# Verify cache utilization
assert mock_cache_manager.set.call_count >= 1