Skip to main content
Glama
leeguooooo
by leeguooooo
test_contact_analysis.py11.1 kB
""" Tests for contact analysis功能 """ import pytest import sqlite3 import json from datetime import datetime, timedelta from pathlib import Path import tempfile import os from src.operations.contact_analysis import ContactAnalyzer, analyze_contacts, get_contact_timeline class TestContactAnalyzer: """测试联系人分析功能""" @pytest.fixture def temp_db(self): """创建临时测试数据库""" # 创建临时数据库 fd, db_path = tempfile.mkstemp(suffix='.db') os.close(fd) conn = sqlite3.connect(db_path) # 创建必要的表结构 conn.execute(""" CREATE TABLE emails ( id INTEGER PRIMARY KEY AUTOINCREMENT, account_id TEXT NOT NULL, folder_id INTEGER NOT NULL, uid TEXT NOT NULL, message_id TEXT, subject TEXT, sender TEXT, sender_email TEXT, recipients TEXT, date_sent TIMESTAMP, date_received TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_read BOOLEAN DEFAULT FALSE, is_flagged BOOLEAN DEFAULT FALSE, is_deleted BOOLEAN DEFAULT FALSE, has_attachments BOOLEAN DEFAULT FALSE, size_bytes INTEGER DEFAULT 0, content_hash TEXT, sync_status TEXT DEFAULT 'synced', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 插入测试数据 now = datetime.now() test_data = [ # Alice 发了 5 封邮件 ('test_account', 1, '1', 'msg1', 'Test 1', 'Alice', 'alice@example.com', json.dumps(['bob@example.com']), (now - timedelta(days=5)).isoformat(), False), ('test_account', 1, '2', 'msg2', 'Test 2', 'Alice', 'alice@example.com', json.dumps(['bob@example.com']), (now - timedelta(days=10)).isoformat(), False), ('test_account', 1, '3', 'msg3', 'Test 3', 'Alice', 'alice@example.com', json.dumps(['charlie@example.com']), (now - timedelta(days=15)).isoformat(), False), ('test_account', 1, '4', 'msg4', 'Test 4', 'Alice', 'alice@example.com', json.dumps(['bob@example.com']), (now - timedelta(days=20)).isoformat(), False), ('test_account', 1, '5', 'msg5', 'Test 5', 'Alice', 'alice@example.com', json.dumps(['dave@example.com']), (now - timedelta(days=25)).isoformat(), False), # Bob 发了 3 封邮件 ('test_account', 1, '6', 'msg6', 'Test 6', 'Bob', 'bob@example.com', json.dumps(['alice@example.com']), (now - timedelta(days=3)).isoformat(), False), ('test_account', 1, '7', 'msg7', 'Test 7', 'Bob', 'bob@example.com', json.dumps(['alice@example.com', 'charlie@example.com']), (now - timedelta(days=8)).isoformat(), False), ('test_account', 1, '8', 'msg8', 'Test 8', 'Bob', 'bob@example.com', json.dumps(['dave@example.com']), (now - timedelta(days=12)).isoformat(), False), # Charlie 发了 2 封邮件 ('test_account', 1, '9', 'msg9', 'Test 9', 'Charlie', 'charlie@example.com', json.dumps(['alice@example.com']), (now - timedelta(days=7)).isoformat(), False), ('test_account', 1, '10', 'msg10', 'Test 10', 'Charlie', 'charlie@example.com', json.dumps(['bob@example.com']), (now - timedelta(days=18)).isoformat(), False), # 一封超过30天的邮件(应该被过滤) ('test_account', 1, '11', 'msg11', 'Old', 'Old Person', 'old@example.com', json.dumps(['someone@example.com']), (now - timedelta(days=35)).isoformat(), False), # 一封已删除的邮件(应该被过滤) ('test_account', 1, '12', 'msg12', 'Deleted', 'Deleted Person', 'deleted@example.com', json.dumps(['someone@example.com']), (now - timedelta(days=5)).isoformat(), True), ] for data in test_data: conn.execute(""" INSERT INTO emails ( account_id, folder_id, uid, message_id, subject, sender, sender_email, recipients, date_sent, is_deleted ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, data) conn.commit() conn.close() yield db_path # 清理 if Path(db_path).exists(): os.unlink(db_path) def test_analyze_contacts_basic(self, temp_db): """测试基本的联系人分析""" analyzer = ContactAnalyzer(temp_db) result = analyzer.analyze_contacts(days=30, limit=10, group_by='both') assert 'error' not in result assert result['total_emails_analyzed'] == 10 # 排除了 is_deleted=True 和超过30天的 assert 'top_senders' in result assert 'top_recipients' in result # 验证 top senders(Alice 5封, Bob 3封, Charlie 2封) top_senders = result['top_senders'] assert len(top_senders) >= 3 assert top_senders[0]['email'] == 'alice@example.com' assert top_senders[0]['count'] == 5 assert top_senders[1]['email'] == 'bob@example.com' assert top_senders[1]['count'] == 3 def test_analyze_contacts_sender_only(self, temp_db): """测试只分析发件人""" analyzer = ContactAnalyzer(temp_db) result = analyzer.analyze_contacts(days=30, limit=5, group_by='sender') assert 'top_senders' in result assert 'top_recipients' not in result def test_analyze_contacts_recipient_only(self, temp_db): """测试只分析收件人""" analyzer = ContactAnalyzer(temp_db) result = analyzer.analyze_contacts(days=30, limit=5, group_by='recipient') assert 'top_recipients' in result assert 'top_senders' not in result def test_analyze_contacts_with_limit(self, temp_db): """测试限制返回数量""" analyzer = ContactAnalyzer(temp_db) result = analyzer.analyze_contacts(days=30, limit=2, group_by='both') assert len(result['top_senders']) <= 2 assert len(result['top_recipients']) <= 2 def test_analyze_contacts_time_filter(self, temp_db): """测试时间过滤""" analyzer = ContactAnalyzer(temp_db) # 最近7天应该有更少的邮件 result_7days = analyzer.analyze_contacts(days=7, limit=10, group_by='both') result_30days = analyzer.analyze_contacts(days=30, limit=10, group_by='both') assert result_7days['total_emails_analyzed'] < result_30days['total_emails_analyzed'] def test_analyze_contacts_summary(self, temp_db): """测试统计摘要""" analyzer = ContactAnalyzer(temp_db) result = analyzer.analyze_contacts(days=30, limit=10, group_by='both') assert 'summary' in result summary = result['summary'] assert 'unique_senders' in summary assert 'unique_recipients' in summary assert 'avg_emails_per_sender' in summary assert summary['unique_senders'] == 3 # Alice, Bob, Charlie def test_get_contact_timeline(self, temp_db): """测试联系人时间线""" analyzer = ContactAnalyzer(temp_db) result = analyzer.get_contact_timeline('alice@example.com', days=30) assert 'error' not in result assert result['contact_email'] == 'alice@example.com' assert result['total_interactions'] > 0 assert 'timeline' in result # 验证时间线项目格式 if result['timeline']: item = result['timeline'][0] assert 'date' in item assert 'subject' in item assert 'direction' in item assert item['direction'] in ['received', 'sent'] def test_contact_timeline_direction(self, temp_db): """测试时间线的发送/接收方向""" analyzer = ContactAnalyzer(temp_db) result = analyzer.get_contact_timeline('alice@example.com', days=30) timeline = result['timeline'] received_count = sum(1 for item in timeline if item['direction'] == 'received') sent_count = sum(1 for item in timeline if item['direction'] == 'sent') # Alice 发了5封,收到了4封(Bob 2封, Charlie 1封, 共3封 + 1封可能有其他) assert received_count > 0 # Alice 作为发件人 assert sent_count > 0 # Alice 作为收件人 def test_convenience_functions(self, temp_db): """测试便捷函数""" # 临时更新环境变量或使用自定义路径 # 这里需要实际的 email_sync.db,所以我们跳过这个测试 # 或者需要 mock EMAIL_SYNC_DB 路径 pass def test_empty_database(self): """测试空数据库""" fd, db_path = tempfile.mkstemp(suffix='.db') os.close(fd) conn = sqlite3.connect(db_path) conn.execute(""" CREATE TABLE emails ( id INTEGER PRIMARY KEY, account_id TEXT, folder_id INTEGER, uid TEXT, message_id TEXT, subject TEXT, sender TEXT, sender_email TEXT, recipients TEXT, date_sent TIMESTAMP, is_deleted BOOLEAN DEFAULT FALSE ) """) conn.commit() conn.close() analyzer = ContactAnalyzer(db_path) result = analyzer.analyze_contacts(days=30, limit=10) assert result['total_emails_analyzed'] == 0 assert result['top_senders'] == [] assert result['top_recipients'] == [] os.unlink(db_path) def test_json_recipients_error_handling(self, temp_db): """测试 recipients JSON 解析错误处理""" # 添加一个无效 JSON 的邮件 conn = sqlite3.connect(temp_db) conn.execute(""" INSERT INTO emails ( account_id, folder_id, uid, message_id, subject, sender, sender_email, recipients, date_sent, is_deleted ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ('test_account', 1, '100', 'msg100', 'Invalid JSON', 'Test', 'test@example.com', 'invalid,json,string', datetime.now().isoformat(), False)) conn.commit() conn.close() analyzer = ContactAnalyzer(temp_db) result = analyzer.analyze_contacts(days=30, limit=10) # 应该不会崩溃,能正常处理 assert 'error' not in result assert result['total_emails_analyzed'] >= 10 if __name__ == '__main__': pytest.main([__file__, '-v'])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server