"""Tests for contact resolution functionality."""
import sqlite3
from pathlib import Path
import pytest
from src.jons_mcp_imessage.db.contacts import (
ContactInfo,
ContactResolver,
ContactCache,
get_contact_cache,
)
@pytest.fixture
def temp_contacts_db(tmp_path: Path) -> Path:
"""Create a temporary contacts database for testing."""
db_path = tmp_path / "test_contacts.abcddb"
conn = sqlite3.connect(db_path)
# Create the necessary tables
conn.execute(
"""
CREATE TABLE ZABCDRECORD (
Z_PK INTEGER PRIMARY KEY,
Z_ENT INTEGER,
ZFIRSTNAME TEXT,
ZLASTNAME TEXT,
ZNICKNAME TEXT,
ZORGANIZATION TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE ZABCDPHONENUMBER (
Z_PK INTEGER PRIMARY KEY,
ZOWNER INTEGER,
ZFULLNUMBER TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE ZABCDEMAILADDRESS (
Z_PK INTEGER PRIMARY KEY,
ZOWNER INTEGER,
ZADDRESS TEXT
)
"""
)
# Insert test data
# Contact 1: Full information
conn.execute(
"""
INSERT INTO ZABCDRECORD (Z_PK, Z_ENT, ZFIRSTNAME, ZLASTNAME, ZNICKNAME, ZORGANIZATION)
VALUES (1, 22, 'John', 'Smith', 'Johnny', 'ACME Corp')
"""
)
conn.execute(
"""
INSERT INTO ZABCDPHONENUMBER (Z_PK, ZOWNER, ZFULLNUMBER)
VALUES (1, 1, '+1 (555) 123-4567')
"""
)
conn.execute(
"""
INSERT INTO ZABCDEMAILADDRESS (Z_PK, ZOWNER, ZADDRESS)
VALUES (1, 1, 'john.smith@example.com')
"""
)
# Contact 2: Name only (no nickname)
conn.execute(
"""
INSERT INTO ZABCDRECORD (Z_PK, Z_ENT, ZFIRSTNAME, ZLASTNAME, ZNICKNAME, ZORGANIZATION)
VALUES (2, 22, 'Jane', 'Doe', NULL, NULL)
"""
)
conn.execute(
"""
INSERT INTO ZABCDPHONENUMBER (Z_PK, ZOWNER, ZFULLNUMBER)
VALUES (2, 2, '555-987-6543')
"""
)
# Contact 3: Organization only
conn.execute(
"""
INSERT INTO ZABCDRECORD (Z_PK, Z_ENT, ZFIRSTNAME, ZLASTNAME, ZNICKNAME, ZORGANIZATION)
VALUES (3, 22, NULL, NULL, NULL, 'Tech Support')
"""
)
conn.execute(
"""
INSERT INTO ZABCDEMAILADDRESS (Z_PK, ZOWNER, ZADDRESS)
VALUES (2, 3, 'support@example.com')
"""
)
# Contact 4: Multiple phone numbers
conn.execute(
"""
INSERT INTO ZABCDRECORD (Z_PK, Z_ENT, ZFIRSTNAME, ZLASTNAME, ZNICKNAME, ZORGANIZATION)
VALUES (4, 22, 'Bob', 'Johnson', NULL, NULL)
"""
)
conn.execute(
"""
INSERT INTO ZABCDPHONENUMBER (Z_PK, ZOWNER, ZFULLNUMBER)
VALUES (3, 4, '+1-555-111-2222')
"""
)
conn.execute(
"""
INSERT INTO ZABCDPHONENUMBER (Z_PK, ZOWNER, ZFULLNUMBER)
VALUES (4, 4, '+1-555-333-4444')
"""
)
conn.commit()
conn.close()
return db_path
class TestContactInfo:
"""Tests for ContactInfo dataclass."""
def test_display_name_nickname_priority(self):
"""Nickname should be highest priority."""
contact = ContactInfo(
first_name="John",
last_name="Smith",
nickname="Johnny",
organization="ACME Corp",
)
assert contact.get_display_name() == "Johnny"
def test_display_name_full_name(self):
"""First + last name should be second priority."""
contact = ContactInfo(
first_name="John",
last_name="Smith",
nickname=None,
organization="ACME Corp",
)
assert contact.get_display_name() == "John Smith"
def test_display_name_first_name_only(self):
"""First name alone should work."""
contact = ContactInfo(
first_name="John",
last_name=None,
nickname=None,
organization="ACME Corp",
)
assert contact.get_display_name() == "John"
def test_display_name_last_name_only(self):
"""Last name alone should work."""
contact = ContactInfo(
first_name=None,
last_name="Smith",
nickname=None,
organization="ACME Corp",
)
assert contact.get_display_name() == "Smith"
def test_display_name_organization_fallback(self):
"""Organization should be used as last resort."""
contact = ContactInfo(
first_name=None,
last_name=None,
nickname=None,
organization="ACME Corp",
)
assert contact.get_display_name() == "ACME Corp"
def test_display_name_none_when_no_data(self):
"""Should return None when no name information available."""
contact = ContactInfo(
first_name=None,
last_name=None,
nickname=None,
organization=None,
)
assert contact.get_display_name() is None
class TestContactResolver:
"""Tests for ContactResolver class."""
def test_resolver_initialization(self):
"""Resolver should start with empty mappings."""
resolver = ContactResolver()
assert len(resolver.phone_to_contact) == 0
assert len(resolver.email_to_contact) == 0
def test_load_contacts_from_db(self, temp_contacts_db: Path):
"""Should load contacts from database correctly."""
resolver = ContactResolver()
contacts_loaded = resolver.load_contacts_from_db(temp_contacts_db)
# Should have loaded 4 contacts
assert contacts_loaded == 4
# Check phone mappings
assert len(resolver.phone_to_contact) > 0
# Check email mappings
assert len(resolver.email_to_contact) > 0
def test_resolve_phone_number(self, temp_contacts_db: Path):
"""Should resolve phone numbers to display names."""
resolver = ContactResolver()
resolver.load_contacts_from_db(temp_contacts_db)
# Test various formats of the same number (with country code)
assert resolver.resolve("+15551234567") == "Johnny"
assert resolver.resolve("+1 (555) 123-4567") == "Johnny"
assert resolver.resolve("+1-555-123-4567") == "Johnny"
def test_resolve_email(self, temp_contacts_db: Path):
"""Should resolve emails to display names."""
resolver = ContactResolver()
resolver.load_contacts_from_db(temp_contacts_db)
# Email should resolve (case-insensitive)
assert resolver.resolve("john.smith@example.com") == "Johnny"
assert resolver.resolve("JOHN.SMITH@EXAMPLE.COM") == "Johnny"
def test_resolve_returns_none_for_me(self, temp_contacts_db: Path):
"""Should return None for 'me' sender."""
resolver = ContactResolver()
resolver.load_contacts_from_db(temp_contacts_db)
assert resolver.resolve("me") is None
def test_resolve_returns_none_for_unknown(self, temp_contacts_db: Path):
"""Should return None for unknown handles."""
resolver = ContactResolver()
resolver.load_contacts_from_db(temp_contacts_db)
assert resolver.resolve("+19999999999") is None
assert resolver.resolve("unknown@example.com") is None
def test_display_name_fallbacks(self, temp_contacts_db: Path):
"""Should use correct display name fallbacks."""
resolver = ContactResolver()
resolver.load_contacts_from_db(temp_contacts_db)
# Contact with nickname -> use nickname
assert resolver.resolve("+15551234567") == "Johnny"
# Contact with only name -> use full name
assert resolver.resolve("5559876543") == "Jane Doe"
# Contact with only organization -> use organization
assert resolver.resolve("support@example.com") == "Tech Support"
def test_multiple_phone_numbers(self, temp_contacts_db: Path):
"""Should handle contacts with multiple phone numbers."""
resolver = ContactResolver()
resolver.load_contacts_from_db(temp_contacts_db)
# Both phone numbers should resolve to same contact
assert resolver.resolve("+15551112222") == "Bob Johnson"
assert resolver.resolve("+15553334444") == "Bob Johnson"
def test_load_all_contacts_returns_stats(self, temp_contacts_db: Path, monkeypatch):
"""load_all_contacts should return statistics dictionary."""
from src.jons_mcp_imessage.db import contacts as contacts_module
# Mock discover_contacts_databases to return our test DB
def mock_discover():
return [temp_contacts_db]
monkeypatch.setattr(contacts_module, "discover_contacts_databases", mock_discover)
resolver = ContactResolver()
stats = resolver.load_all_contacts()
assert isinstance(stats, dict)
assert stats["databases_found"] == 1
assert stats["databases_loaded"] == 1
assert stats["total_contacts"] == 4
assert stats["total_phone_mappings"] > 0
assert stats["total_email_mappings"] > 0
def test_first_match_wins(self, tmp_path: Path, monkeypatch):
"""When duplicate handles exist, first match should win."""
# Create two databases with overlapping contacts
db1_path = tmp_path / "db1.abcddb"
db2_path = tmp_path / "db2.abcddb"
for db_path, first_name in [(db1_path, "First"), (db2_path, "Second")]:
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE ZABCDRECORD (
Z_PK INTEGER PRIMARY KEY,
Z_ENT INTEGER,
ZFIRSTNAME TEXT,
ZLASTNAME TEXT,
ZNICKNAME TEXT,
ZORGANIZATION TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE ZABCDPHONENUMBER (
Z_PK INTEGER PRIMARY KEY,
ZOWNER INTEGER,
ZFULLNUMBER TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE ZABCDEMAILADDRESS (
Z_PK INTEGER PRIMARY KEY,
ZOWNER INTEGER,
ZADDRESS TEXT
)
"""
)
conn.execute(
f"""
INSERT INTO ZABCDRECORD (Z_PK, Z_ENT, ZFIRSTNAME, ZLASTNAME)
VALUES (1, 22, '{first_name}', 'Contact')
"""
)
conn.execute(
"""
INSERT INTO ZABCDPHONENUMBER (Z_PK, ZOWNER, ZFULLNUMBER)
VALUES (1, 1, '+15551234567')
"""
)
conn.commit()
conn.close()
from src.jons_mcp_imessage.db import contacts as contacts_module
# Mock to return both databases in order
def mock_discover():
return [db1_path, db2_path]
monkeypatch.setattr(contacts_module, "discover_contacts_databases", mock_discover)
resolver = ContactResolver()
resolver.load_all_contacts()
# Should get "First Contact" since db1 is loaded first
assert resolver.resolve("+15551234567") == "First Contact"
class TestContactCache:
"""Tests for ContactCache class."""
@pytest.mark.asyncio
async def test_singleton_pattern(self):
"""get_contact_cache should return the same instance."""
cache1 = get_contact_cache()
cache2 = get_contact_cache()
assert cache1 is cache2
@pytest.mark.asyncio
async def test_is_available_property(self):
"""is_available should reflect CONTACTS_ACCESS_AVAILABLE."""
cache = ContactCache()
# Should match the global flag
from src.jons_mcp_imessage.db.contacts import CONTACTS_ACCESS_AVAILABLE
assert cache.is_available == CONTACTS_ACCESS_AVAILABLE
@pytest.mark.asyncio
async def test_resolve_name_when_unavailable(self, monkeypatch):
"""resolve_name should return None when contacts unavailable."""
from src.jons_mcp_imessage.db import contacts as contacts_module
# Mock contacts as unavailable
monkeypatch.setattr(contacts_module, "CONTACTS_ACCESS_AVAILABLE", False)
cache = ContactCache()
name = await cache.resolve_name("+15551234567")
assert name is None
@pytest.mark.asyncio
async def test_resolve_name_loads_once(self, temp_contacts_db: Path, monkeypatch):
"""resolve_name should load contacts only once."""
from src.jons_mcp_imessage.db import contacts as contacts_module
# Mock discover to return our test DB
def mock_discover():
return [temp_contacts_db]
monkeypatch.setattr(contacts_module, "discover_contacts_databases", mock_discover)
cache = ContactCache()
# First call should load
name1 = await cache.resolve_name("+15551234567")
assert name1 == "Johnny"
assert cache._loaded is True
# Second call should not reload
name2 = await cache.resolve_name("+15551234567")
assert name2 == "Johnny"
@pytest.mark.asyncio
async def test_resolve_names_batch(self, temp_contacts_db: Path, monkeypatch):
"""resolve_names_batch should resolve multiple handles."""
from src.jons_mcp_imessage.db import contacts as contacts_module
def mock_discover():
return [temp_contacts_db]
monkeypatch.setattr(contacts_module, "discover_contacts_databases", mock_discover)
cache = ContactCache()
handles = [
"+15551234567", # Johnny
"5559876543", # Jane Doe
"support@example.com", # Tech Support
"+19999999999", # Unknown
]
results = await cache.resolve_names_batch(handles)
assert isinstance(results, dict)
assert len(results) == 4
assert results["+15551234567"] == "Johnny"
assert results["5559876543"] == "Jane Doe"
assert results["support@example.com"] == "Tech Support"
assert results["+19999999999"] is None
@pytest.mark.asyncio
async def test_resolve_names_batch_when_unavailable(self, monkeypatch):
"""resolve_names_batch should return all None when unavailable."""
from src.jons_mcp_imessage.db import contacts as contacts_module
monkeypatch.setattr(contacts_module, "CONTACTS_ACCESS_AVAILABLE", False)
cache = ContactCache()
handles = ["+15551234567", "test@example.com"]
results = await cache.resolve_names_batch(handles)
assert results == {"+15551234567": None, "test@example.com": None}
@pytest.mark.asyncio
async def test_refresh_reloads_contacts(self, tmp_path: Path, monkeypatch):
"""refresh should reload contacts from database."""
from src.jons_mcp_imessage.db import contacts as contacts_module
# Create initial database
db_path = tmp_path / "contacts.abcddb"
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE ZABCDRECORD (
Z_PK INTEGER PRIMARY KEY,
Z_ENT INTEGER,
ZFIRSTNAME TEXT,
ZLASTNAME TEXT,
ZNICKNAME TEXT,
ZORGANIZATION TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE ZABCDPHONENUMBER (
Z_PK INTEGER PRIMARY KEY,
ZOWNER INTEGER,
ZFULLNUMBER TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE ZABCDEMAILADDRESS (
Z_PK INTEGER PRIMARY KEY,
ZOWNER INTEGER,
ZADDRESS TEXT
)
"""
)
conn.execute(
"""
INSERT INTO ZABCDRECORD (Z_PK, Z_ENT, ZFIRSTNAME, ZLASTNAME)
VALUES (1, 22, 'John', 'Smith')
"""
)
conn.execute(
"""
INSERT INTO ZABCDPHONENUMBER (Z_PK, ZOWNER, ZFULLNUMBER)
VALUES (1, 1, '+15551234567')
"""
)
conn.commit()
conn.close()
def mock_discover():
return [db_path]
monkeypatch.setattr(contacts_module, "discover_contacts_databases", mock_discover)
cache = ContactCache()
# Initial load
name1 = await cache.resolve_name("+15551234567")
assert name1 == "John Smith"
# Modify database to add a nickname
conn = sqlite3.connect(db_path)
conn.execute("UPDATE ZABCDRECORD SET ZNICKNAME = 'Johnny' WHERE Z_PK = 1")
conn.commit()
conn.close()
# Before refresh, should still return old value
name2 = await cache.resolve_name("+15551234567")
assert name2 == "John Smith"
# After refresh, should return new value
await cache.refresh()
name3 = await cache.resolve_name("+15551234567")
assert name3 == "Johnny"
@pytest.mark.asyncio
async def test_thread_safety_concurrent_access(
self, temp_contacts_db: Path, monkeypatch
):
"""Multiple concurrent calls should safely load contacts once."""
import asyncio
from src.jons_mcp_imessage.db import contacts as contacts_module
def mock_discover():
return [temp_contacts_db]
monkeypatch.setattr(contacts_module, "discover_contacts_databases", mock_discover)
cache = ContactCache()
# Simulate concurrent access
async def resolve_concurrent(handle: str):
return await cache.resolve_name(handle)
# Start multiple concurrent lookups
tasks = [
resolve_concurrent("+15551234567"),
resolve_concurrent("5559876543"),
resolve_concurrent("+15551234567"),
resolve_concurrent("support@example.com"),
]
results = await asyncio.gather(*tasks)
# All should succeed
assert results[0] == "Johnny"
assert results[1] == "Jane Doe"
assert results[2] == "Johnny"
assert results[3] == "Tech Support"
# Should only have loaded once
assert cache._loaded is True
class TestContactTools:
"""Tests for contact-related MCP tools."""
@pytest.mark.asyncio
async def test_lookup_contact_found(self, temp_contacts_db: Path, monkeypatch):
"""lookup_contact should return contact info when found."""
from src.jons_mcp_imessage.db import contacts as contacts_module
from src.jons_mcp_imessage.tools.contacts import lookup_contact
def mock_discover():
return [temp_contacts_db]
monkeypatch.setattr(contacts_module, "discover_contacts_databases", mock_discover)
monkeypatch.setattr(contacts_module, "CONTACTS_ACCESS_AVAILABLE", True)
# Clear the singleton cache so we get a fresh one with mocked discovery
contacts_module._contact_cache = None
result = await lookup_contact("+15551234567")
assert result is not None
assert result["name"] == "Johnny"
assert result["matched_handle"] == "+15551234567"
assert "error" not in result
@pytest.mark.asyncio
async def test_lookup_contact_not_found(self, temp_contacts_db: Path, monkeypatch):
"""lookup_contact should return None when contact not found."""
from src.jons_mcp_imessage.db import contacts as contacts_module
from src.jons_mcp_imessage.tools.contacts import lookup_contact
def mock_discover():
return [temp_contacts_db]
monkeypatch.setattr(contacts_module, "discover_contacts_databases", mock_discover)
monkeypatch.setattr(contacts_module, "CONTACTS_ACCESS_AVAILABLE", True)
# Clear the singleton cache
contacts_module._contact_cache = None
result = await lookup_contact("+19999999999")
assert result is None
@pytest.mark.asyncio
async def test_lookup_contact_unavailable(self, monkeypatch):
"""lookup_contact should return error when contacts unavailable."""
from src.jons_mcp_imessage.db import contacts as contacts_module
from src.jons_mcp_imessage.tools.contacts import lookup_contact
monkeypatch.setattr(contacts_module, "CONTACTS_ACCESS_AVAILABLE", False)
# Clear the singleton cache
contacts_module._contact_cache = None
result = await lookup_contact("+15551234567")
assert result is not None
assert "error" in result
assert "Contacts access not available" in result["error"]
@pytest.mark.asyncio
async def test_search_contacts_basic(self, tmp_path: Path, monkeypatch):
"""search_contacts should find matching handles in iMessage database."""
from contextlib import contextmanager
from src.jons_mcp_imessage.tools import contacts as contacts_module
# Create a temporary iMessage database
imessage_db = tmp_path / "chat.db"
conn = sqlite3.connect(imessage_db)
# Create handle table
conn.execute(
"""
CREATE TABLE handle (
ROWID INTEGER PRIMARY KEY,
id TEXT,
service TEXT,
country TEXT
)
"""
)
# Create chat_handle_join table
conn.execute(
"""
CREATE TABLE chat_handle_join (
chat_id INTEGER,
handle_id INTEGER
)
"""
)
# Insert test data
conn.execute(
"""
INSERT INTO handle (ROWID, id, service, country)
VALUES (1, '+15551234567', 'iMessage', 'us')
"""
)
conn.execute(
"""
INSERT INTO handle (ROWID, id, service, country)
VALUES (2, 'test@example.com', 'iMessage', NULL)
"""
)
conn.execute(
"""
INSERT INTO handle (ROWID, id, service, country)
VALUES (3, '+15559876543', 'SMS', 'us')
"""
)
# Add some chat associations
conn.execute("INSERT INTO chat_handle_join VALUES (1, 1)")
conn.execute("INSERT INTO chat_handle_join VALUES (2, 1)")
conn.execute("INSERT INTO chat_handle_join VALUES (3, 2)")
conn.commit()
conn.close()
# Mock get_connection to return our test database
@contextmanager
def mock_get_connection(timeout=5.0):
test_conn = sqlite3.connect(imessage_db)
test_conn.row_factory = sqlite3.Row
try:
yield test_conn
finally:
test_conn.close()
# Patch where it's used, not where it's defined
monkeypatch.setattr(contacts_module, "get_connection", mock_get_connection)
# Search for phone number (normalized to 5551234567)
result = await contacts_module.search_contacts(query="555-123-4567", limit=10)
assert result["total"] >= 1
assert len(result["results"]) >= 1
assert any("5551234567" in r["id"] for r in result["results"])
# Search for email
result = await contacts_module.search_contacts(query="example.com", limit=10)
assert result["total"] >= 1
assert len(result["results"]) >= 1
assert any("example.com" in r["id"] for r in result["results"])
@pytest.mark.asyncio
async def test_search_contacts_limit(self, tmp_path: Path, monkeypatch):
"""search_contacts should respect limit parameter."""
from contextlib import contextmanager
from src.jons_mcp_imessage.tools import contacts as contacts_module
# Create a temporary iMessage database with multiple handles
imessage_db = tmp_path / "chat.db"
conn = sqlite3.connect(imessage_db)
conn.execute(
"""
CREATE TABLE handle (
ROWID INTEGER PRIMARY KEY,
id TEXT,
service TEXT,
country TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE chat_handle_join (
chat_id INTEGER,
handle_id INTEGER
)
"""
)
# Insert 5 handles that match "555"
for i in range(5):
conn.execute(
f"""
INSERT INTO handle (ROWID, id, service, country)
VALUES ({i + 1}, '+1555123456{i}', 'iMessage', 'us')
"""
)
conn.commit()
conn.close()
# Mock get_connection to return our test database
@contextmanager
def mock_get_connection(timeout=5.0):
test_conn = sqlite3.connect(imessage_db)
test_conn.row_factory = sqlite3.Row
try:
yield test_conn
finally:
test_conn.close()
# Patch where it's used
monkeypatch.setattr(contacts_module, "get_connection", mock_get_connection)
# Search with limit=2
result = await contacts_module.search_contacts(query="555", limit=2)
assert result["total"] == 5
assert len(result["results"]) == 2