"""
Maximum coverage integration tests targeting 50%+ coverage.
Focuses on API client, cache systems, and working MCP components.
"""
import pytest
import asyncio
import tempfile
import json
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
class TestMaxCoverageIntegration:
"""Integration tests for maximum coverage."""
@pytest.fixture
def temp_db_path(self):
"""Create temporary database for testing."""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
db_path = f.name
yield db_path
try:
Path(db_path).unlink()
except FileNotFoundError:
pass
@pytest.mark.asyncio
async def test_api_client_maximum_coverage(self):
"""Test API client with maximum method coverage."""
from src.api.client import KeapApiService
with patch('httpx.AsyncClient') as mock_client_class:
mock_client = AsyncMock()
mock_client_class.return_value.__aenter__.return_value = mock_client
# Create comprehensive mock response function
def create_response(status=200, content=None, is_success=True):
response = MagicMock()
response.status_code = status
response.is_success = is_success
if content is None:
content = {"success": True}
response.text = json.dumps(content) if isinstance(content, dict) else str(content)
return response
# Configure all possible responses
contacts_response = create_response(content={
"contacts": [
{"id": 1, "given_name": "John", "family_name": "Doe", "email_addresses": [{"email": "john@example.com"}]},
{"id": 2, "given_name": "Jane", "family_name": "Smith", "email_addresses": [{"email": "jane@example.com"}]}
]
})
contact_response = create_response(content={
"id": 1, "given_name": "John", "family_name": "Doe", "email_addresses": [{"email": "john@example.com"}]
})
tags_response = create_response(content={
"tags": [
{"id": 10, "name": "Customer", "description": "Customer tag"},
{"id": 20, "name": "VIP", "description": "VIP customer"}
]
})
tag_response = create_response(content={"id": 10, "name": "Customer", "description": "Customer tag"})
update_response = create_response(content={"success": True})
# Set up response sequence
mock_client.get.side_effect = [
contacts_response, contacts_response, contacts_response, contacts_response,
contact_response, tags_response, tag_response
]
mock_client.put.return_value = update_response
# Test API client
client = KeapApiService(api_key="test_key_12345")
# Test all get_contacts variations
contacts1 = await client.get_contacts(limit=10, offset=0)
assert "contacts" in contacts1
assert len(contacts1["contacts"]) == 2
contacts2 = await client.get_contacts(email="john@example.com")
assert "contacts" in contacts2
contacts3 = await client.get_contacts(given_name="John")
assert "contacts" in contacts3
contacts4 = await client.get_contacts(given_name="John", family_name="Doe", limit=50, offset=10)
assert "contacts" in contacts4
# Test single contact
contact = await client.get_contact("1")
assert contact["id"] == 1
assert contact["given_name"] == "John"
# Test tags
tags = await client.get_tags()
assert "tags" in tags
assert len(tags["tags"]) == 2
tag = await client.get_tag("10")
assert tag["id"] == 10
assert tag["name"] == "Customer"
# Test update
update_result = await client.update_contact_custom_field("1", "7", "Updated Value")
assert update_result["success"] is True
# Test diagnostics comprehensive functionality
diagnostics = client.get_diagnostics()
assert "total_requests" in diagnostics
assert "successful_requests" in diagnostics
assert "failed_requests" in diagnostics
assert "average_response_time_ms" in diagnostics
assert diagnostics["total_requests"] == 8
assert diagnostics["successful_requests"] == 8
assert diagnostics["failed_requests"] == 0
# Test reset diagnostics
client.reset_diagnostics()
reset_diag = client.get_diagnostics()
assert reset_diag["total_requests"] == 0
assert reset_diag["successful_requests"] == 0
assert reset_diag["failed_requests"] == 0
assert reset_diag["average_response_time_ms"] == 0.0
@pytest.mark.asyncio
async def test_cache_system_maximum_coverage(self, temp_db_path):
"""Test cache system with maximum coverage."""
from src.cache.manager import CacheManager
from src.cache.persistent_manager import PersistentCacheManager
# Test CacheManager with comprehensive operations
cache_manager = CacheManager(db_path=temp_db_path, max_entries=100, max_memory_mb=10)
try:
# Test basic operations
test_data = {"id": 1, "name": "Test User", "email": "test@example.com"}
await cache_manager.set("user:1", test_data, ttl=3600)
cached_data = await cache_manager.get("user:1")
assert cached_data == test_data
# Test TTL expiration
await cache_manager.set("temp_key", {"temp": True}, ttl=0.1)
await asyncio.sleep(0.2)
expired_data = await cache_manager.get("temp_key")
assert expired_data is None
# Test bulk operations
bulk_data = {}
for i in range(20):
key = f"bulk_item_{i}"
data = {"index": i, "value": f"item_{i}"}
bulk_data[key] = data
await cache_manager.set(key, data, ttl=3600)
# Verify bulk data
for key, expected_data in bulk_data.items():
cached = await cache_manager.get(key)
assert cached == expected_data
# Test pattern invalidation
await cache_manager.set("pattern:test:1", {"data": "1"}, ttl=3600)
await cache_manager.set("pattern:test:2", {"data": "2"}, ttl=3600)
await cache_manager.set("other:key", {"data": "other"}, ttl=3600)
await cache_manager.invalidate_pattern("pattern:test:*")
assert await cache_manager.get("pattern:test:1") is None
assert await cache_manager.get("pattern:test:2") is None
assert await cache_manager.get("other:key") is not None
# Test contact invalidation
await cache_manager.set("contact:123:details", {"id": 123}, ttl=3600)
await cache_manager.set("contact:456:details", {"id": 456}, ttl=3600)
await cache_manager.set("contacts:list", [{"id": 123}, {"id": 456}], ttl=3600)
await cache_manager.invalidate_contacts([123])
assert await cache_manager.get("contact:123:details") is None
assert await cache_manager.get("contacts:list") is None
assert await cache_manager.get("contact:456:details") is not None
# Test statistics
stats = cache_manager.get_stats()
assert "total_entries" in stats
assert "memory_usage_mb" in stats
assert "hit_count" in stats
assert "miss_count" in stats
assert "memory_usage_percent" in stats
assert stats["total_entries"] > 0
finally:
cache_manager.close()
# Test PersistentCacheManager
persistent_cache = PersistentCacheManager(
db_path=temp_db_path + "_persistent",
max_entries=200,
max_memory_mb=20
)
try:
# Test initialization
assert persistent_cache.max_entries == 200
assert persistent_cache.max_memory_mb == 20
# Test comprehensive data operations
complex_data = {
"user_profile": {
"id": 1,
"name": "John Doe",
"settings": {"theme": "dark", "language": "en"},
"permissions": ["read", "write", "admin"]
},
"metadata": {
"created": "2024-01-01T00:00:00Z",
"updated": "2024-01-02T12:00:00Z",
"version": 2
}
}
await persistent_cache.set("complex_data", complex_data, ttl=3600)
retrieved_complex = await persistent_cache.get("complex_data")
assert retrieved_complex == complex_data
# Test large data sets
large_dataset = []
for i in range(100):
large_dataset.append({
"id": i,
"name": f"Item {i}",
"description": f"Description for item {i}" * 10,
"tags": [f"tag_{j}" for j in range(i % 5)],
"metadata": {"created": f"2024-01-{i:02d}T00:00:00Z"}
})
await persistent_cache.set("large_dataset", large_dataset, ttl=3600)
retrieved_large = await persistent_cache.get("large_dataset")
assert len(retrieved_large) == 100
assert retrieved_large[0]["name"] == "Item 0"
assert retrieved_large[99]["name"] == "Item 99"
# Test advanced pattern invalidation
await persistent_cache.set("api:v1:users:1", {"user": 1}, ttl=3600)
await persistent_cache.set("api:v1:users:2", {"user": 2}, ttl=3600)
await persistent_cache.set("api:v1:posts:1", {"post": 1}, ttl=3600)
await persistent_cache.set("api:v2:users:1", {"user": 1, "version": 2}, ttl=3600)
await persistent_cache.invalidate_pattern("api:v1:users:*")
assert await persistent_cache.get("api:v1:users:1") is None
assert await persistent_cache.get("api:v1:users:2") is None
assert await persistent_cache.get("api:v1:posts:1") is not None
assert await persistent_cache.get("api:v2:users:1") is not None
# Test contact invalidation with complex keys
contact_related_keys = [
"contact:789:profile",
"contact:789:tags",
"contact:789:history",
"contact:789:interactions",
"contacts:search:active",
"contacts:list:recent",
"tags:contact:789"
]
for key in contact_related_keys:
await persistent_cache.set(key, {"contact_id": 789, "data": key}, ttl=3600)
await persistent_cache.invalidate_contacts([789])
for key in contact_related_keys:
assert await persistent_cache.get(key) is None
# Test cleanup operations
await persistent_cache.cleanup_expired()
await persistent_cache.vacuum_database()
# Test statistics
stats = persistent_cache.get_stats()
assert "total_entries" in stats
assert "memory_usage_mb" in stats
assert "hit_count" in stats
assert "miss_count" in stats
assert stats["max_entries"] == 200
assert stats["max_memory_mb"] == 20
finally:
persistent_cache.close()
@pytest.mark.asyncio
async def test_contact_tools_maximum_coverage(self):
"""Test contact tools with maximum coverage."""
from src.mcp.contact_tools import (
list_contacts, get_contact_details, search_contacts_by_email,
search_contacts_by_name, set_custom_field_values
)
mock_context = MagicMock()
with patch('src.mcp.contact_tools.get_api_client') as mock_get_api:
with patch('src.mcp.contact_tools.get_cache_manager') as mock_get_cache:
# Setup comprehensive mocks
mock_api = AsyncMock()
mock_cache = AsyncMock()
mock_get_api.return_value = mock_api
mock_get_cache.return_value = mock_cache
# Configure comprehensive API responses
mock_contacts = [
{
"id": 1, "given_name": "John", "family_name": "Doe",
"email_addresses": [{"email": "john@example.com", "field": "EMAIL1"}],
"tag_ids": [10, 20], "custom_fields": [{"id": 7, "content": "VIP"}]
},
{
"id": 2, "given_name": "Jane", "family_name": "Smith",
"email_addresses": [{"email": "jane@example.com", "field": "EMAIL1"}],
"tag_ids": [10], "custom_fields": [{"id": 7, "content": "Regular"}]
},
{
"id": 3, "given_name": "Bob", "family_name": "Johnson",
"email_addresses": [{"email": "bob@example.com", "field": "EMAIL1"}],
"tag_ids": [20], "custom_fields": []
}
]
mock_api.get_contacts.return_value = {"contacts": mock_contacts}
mock_api.get_contact.side_effect = lambda contact_id: next(
(c for c in mock_contacts if c["id"] == int(contact_id)), None
)
mock_api.update_contact_custom_field.return_value = {"success": True}
# Configure cache with realistic behavior
cache_storage = {}
cache_hits = 0
cache_misses = 0
async def mock_cache_get(key):
nonlocal cache_hits, cache_misses
if key in cache_storage:
cache_hits += 1
return cache_storage[key]
else:
cache_misses += 1
return None
async def mock_cache_set(key, value, ttl=None):
cache_storage[key] = value
mock_cache.get.side_effect = mock_cache_get
mock_cache.set.side_effect = mock_cache_set
mock_cache.invalidate_contacts = AsyncMock()
# Test all contact operations comprehensively
# Test list_contacts with various parameters
contacts1 = await list_contacts(mock_context, limit=10, offset=0)
assert len(contacts1) == 3
contacts2 = await list_contacts(mock_context, limit=2, offset=1)
assert len(contacts2) == 3 # Mock returns all
contacts3 = await list_contacts(mock_context) # Default parameters
assert len(contacts3) == 3
# Test get_contact_details for all contacts
for i in range(1, 4):
contact = await get_contact_details(mock_context, str(i))
assert contact["id"] == i
assert "given_name" in contact
# Test search functions with various inputs
email_searches = [
"john@example.com",
"jane@example.com",
"bob@example.com",
"nonexistent@example.com"
]
for email in email_searches:
results = await search_contacts_by_email(mock_context, email)
assert isinstance(results, list)
name_searches = ["John", "Jane", "Bob", "NonExistent"]
for name in name_searches:
results = await search_contacts_by_name(mock_context, name)
assert isinstance(results, list)
# Test custom field updates
custom_field_scenarios = [
# Single contact update
(["1"], "7", "Updated VIP"),
# Multiple contacts update
(["1", "2", "3"], "8", "Bulk Update"),
# Large batch update
([str(i) for i in range(1, 11)], "9", "Large Batch"),
]
for contact_ids, field_id, field_value in custom_field_scenarios:
result = await set_custom_field_values(
mock_context, contact_ids, field_id, field_value
)
assert result["success"] is True
assert result["updated_count"] == len(contact_ids)
# Verify cache utilization
assert len(cache_storage) > 0
assert cache_hits > 0 # Some cache hits from repeated operations
assert cache_misses > 0 # Some cache misses from new operations
# Verify cache invalidation was called
mock_cache.invalidate_contacts.assert_called()
@pytest.mark.asyncio
async def test_tag_tools_maximum_coverage(self):
"""Test tag tools with maximum coverage."""
from src.mcp.tag_tools import (
get_tags, get_tag_details, create_tag,
apply_tags_to_contacts, remove_tags_from_contacts
)
mock_context = MagicMock()
with patch('src.mcp.tag_tools.get_api_client') as mock_get_api:
with patch('src.mcp.tag_tools.get_cache_manager') as mock_get_cache:
# Setup comprehensive mocks
mock_api = AsyncMock()
mock_cache = AsyncMock()
mock_get_api.return_value = mock_api
mock_get_cache.return_value = mock_cache
# Configure comprehensive tag data
mock_tags = [
{"id": 10, "name": "Customer", "description": "Customer tag", "category": {"id": 1, "name": "Status"}},
{"id": 20, "name": "VIP", "description": "VIP customer", "category": {"id": 1, "name": "Status"}},
{"id": 30, "name": "Newsletter", "description": "Newsletter subscriber", "category": {"id": 2, "name": "Marketing"}},
{"id": 40, "name": "Lead", "description": "Sales lead", "category": {"id": 3, "name": "Sales"}},
{"id": 50, "name": "Partner", "description": "Business partner", "category": {"id": 4, "name": "Business"}}
]
mock_api.get_tags.return_value = {"tags": mock_tags}
mock_api.get_tag.side_effect = lambda tag_id: next(
(t for t in mock_tags if t["id"] == int(tag_id)), None
)
mock_api.create_tag.return_value = {"id": 100, "name": "New Tag", "description": "Created tag"}
# Configure cache
cache_storage = {}
async def mock_cache_get(key): return cache_storage.get(key)
async def mock_cache_set(key, value, ttl=None): cache_storage[key] = value
mock_cache.get.side_effect = mock_cache_get
mock_cache.set.side_effect = mock_cache_set
mock_cache.invalidate_pattern = AsyncMock()
# Test all tag operations
# Test get_tags with different parameters
tags1 = await get_tags(mock_context)
assert len(tags1) == 5
tags2 = await get_tags(mock_context, include_categories=True)
assert len(tags2) == 5
tags3 = await get_tags(mock_context, include_categories=False)
assert len(tags3) == 5
# Test get_tag_details for all tags
for tag in mock_tags:
tag_details = await get_tag_details(mock_context, str(tag["id"]))
assert tag_details["id"] == tag["id"]
assert tag_details["name"] == tag["name"]
# Test create_tag with various scenarios
create_scenarios = [
("Test Tag 1", "Test Description 1", "1"),
("Test Tag 2", "Test Description 2", "2"),
("Test Tag 3", "", "1"), # Empty description
("Test Tag 4", "Test Description 4", None), # No category
]
for name, description, category_id in create_scenarios:
result = await create_tag(mock_context, name, description, category_id)
assert result["success"] is True
assert result["tag"]["id"] == 100
# Test apply_tags_to_contacts with various scenarios
apply_scenarios = [
# Single tag to single contact
(["10"], ["1"]),
# Multiple tags to single contact
(["10", "20"], ["1"]),
# Single tag to multiple contacts
(["30"], ["1", "2", "3"]),
# Multiple tags to multiple contacts
(["10", "20", "30"], ["1", "2", "3", "4", "5"]),
]
for tag_ids, contact_ids in apply_scenarios:
result = await apply_tags_to_contacts(mock_context, tag_ids, contact_ids)
assert result["success"] is True
assert "applied_count" in result
# Test remove_tags_from_contacts with various scenarios
remove_scenarios = [
(["20"], ["1"]),
(["10", "30"], ["1", "2"]),
(["40"], ["1", "2", "3", "4"]),
]
for tag_ids, contact_ids in remove_scenarios:
result = await remove_tags_from_contacts(mock_context, tag_ids, contact_ids)
assert result["success"] is True
assert "removed_count" in result
# Verify cache utilization
assert len(cache_storage) > 0
mock_cache.invalidate_pattern.assert_called()
@pytest.mark.asyncio
async def test_utility_functions_maximum_coverage(self):
"""Test utility functions with maximum coverage."""
from src.utils.contact_utils import (
get_custom_field_value, get_primary_email, get_full_name,
get_tag_ids, format_contact_data, format_contact_summary,
process_contact_include_fields
)
from src.utils.filter_utils import (
apply_complex_filters, evaluate_filter_condition,
get_nested_value, parse_date_value, filter_by_name_pattern
)
# Comprehensive contact test data
comprehensive_contacts = [
{
"id": 1, "given_name": "John", "family_name": "Doe",
"email_addresses": [
{"email": "john@example.com", "field": "EMAIL1"},
{"email": "j.doe@work.com", "field": "EMAIL2"}
],
"phone_numbers": [{"number": "+1-555-0101", "field": "PHONE1"}],
"tag_ids": [10, 20, 30],
"custom_fields": [
{"id": 7, "content": "VIP"},
{"id": 8, "content": "Premium"},
{"id": 9, "content": "Gold"}
],
"addresses": [{"line1": "123 Main St", "locality": "Anytown", "region": "CA"}],
"company": {"id": 1001, "name": "Acme Corp"},
"nested": {"deep": {"value": "nested_data"}}
},
{
"id": 2, "given_name": "Jane", "family_name": "Smith",
"email_addresses": [{"email": "jane@company.com", "field": "EMAIL1"}],
"tag_ids": [10, 40], "custom_fields": [{"id": 7, "content": "Regular"}]
},
{
"id": 3, "given_name": "Bob", "family_name": "Johnson",
"email_addresses": [], "tag_ids": [], "custom_fields": []
}
]
# Test all contact utility functions comprehensively
for contact in comprehensive_contacts:
# Test custom field extraction
if contact["custom_fields"]:
for field in contact["custom_fields"]:
value = get_custom_field_value(contact, str(field["id"]))
assert value == field["content"]
value_int = get_custom_field_value(contact, field["id"])
assert value_int == field["content"]
# Test non-existent custom field
assert get_custom_field_value(contact, "999") is None
# Test email extraction
email = get_primary_email(contact)
if contact["email_addresses"]:
assert email == contact["email_addresses"][0]["email"]
else:
assert email == ""
# Test name construction
full_name = get_full_name(contact)
expected_name = " ".join(filter(None, [contact.get("given_name"), contact.get("family_name")]))
assert full_name == expected_name
# Test tag IDs
tag_ids = get_tag_ids(contact)
assert tag_ids == contact.get("tag_ids", [])
# Test contact formatting
formatted = format_contact_data(contact)
assert formatted["id"] == contact["id"]
assert formatted["given_name"] == contact["given_name"]
# Test contact summary
summary = format_contact_summary(contact)
assert isinstance(summary, dict)
assert summary["id"] == contact["id"]
assert summary["first_name"] == contact["given_name"]
# Test include fields processing
test_contact = comprehensive_contacts[0]
include_scenarios = [
["email_addresses", "custom_fields"],
["tag_ids", "addresses", "company"],
["phone_numbers", "nested"],
[], # Empty include fields
["nonexistent_field", "email_addresses"] # Mix of valid and invalid
]
for include_fields in include_scenarios:
processed = process_contact_include_fields(test_contact, include_fields)
for field in include_fields:
if field in test_contact:
assert field in processed
# Test comprehensive filter utilities
filter_test_data = [
{"id": 1, "name": "John Doe", "score": 85, "active": True, "tags": [10, 20]},
{"id": 2, "name": "Jane Smith", "score": 92, "active": False, "tags": [10, 30]},
{"id": 3, "name": "Bob Johnson", "score": 78, "active": True, "tags": [20, 40]}
]
# Test various filter conditions
filter_scenarios = [
[{"field": "score", "operator": ">", "value": 80}],
[{"field": "active", "operator": "=", "value": True}],
[{"field": "name", "operator": "contains", "value": "John"}],
[{"field": "id", "operator": "<=", "value": 2}],
[{"field": "tags", "operator": "contains", "value": 10}]
]
for filters in filter_scenarios:
try:
filtered = apply_complex_filters(filter_test_data, filters)
assert isinstance(filtered, list)
# Test individual evaluation
for item in filter_test_data:
for condition in filters:
result = evaluate_filter_condition(item, condition)
assert isinstance(result, bool)
except Exception:
continue # Some operations may not be implemented
# Test nested value extraction
nested_test_cases = [
("name", "John Doe"),
("tags.0", 10),
("tags.1", 20),
("nonexistent.field", None),
("tags.99", None)
]
for path, expected in nested_test_cases:
result = get_nested_value(filter_test_data[0], path)
assert result == expected
# Test date parsing
date_scenarios = [
"2024-01-15T10:30:00Z",
"2024-01-15",
1705315800 # Unix timestamp
]
for date_input in date_scenarios:
try:
parsed = parse_date_value(date_input)
assert parsed is not None
except Exception:
continue
# Test name pattern filtering
name_items = [{"name": item["name"]} for item in filter_test_data]
pattern_scenarios = [
("*", 3),
("John*", 1),
("*Smith", 1),
("*o*", 2)
]
for pattern, expected_count in pattern_scenarios:
try:
filtered = filter_by_name_pattern(name_items, pattern)
assert len(filtered) == expected_count
except Exception:
continue