Skip to main content
Glama
leeguooooo
by leeguooooo
contact_analysis.py9.61 kB
""" 联系人频率分析模块 """ import sqlite3 import logging from typing import Dict, Any, List, Optional from datetime import datetime, timedelta from collections import Counter import json from ..config.paths import EMAIL_SYNC_DB logger = logging.getLogger(__name__) class ContactAnalyzer: """联系人频率分析器""" def __init__(self, db_path: str = None): """初始化分析器""" self.db_path = db_path or EMAIL_SYNC_DB def analyze_contacts( self, account_id: Optional[str] = None, days: int = 30, limit: int = 10, group_by: str = "both" # sender, recipient, both ) -> Dict[str, Any]: """ 分析联系人频率 Args: account_id: 账户ID,None表示分析所有账户 days: 分析最近N天的数据 limit: 返回Top N联系人 group_by: 分组方式(sender/recipient/both) Returns: 分析结果字典 """ try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # 计算时间范围 cutoff_date = datetime.now() - timedelta(days=days) cutoff_date_str = cutoff_date.isoformat() # 构建查询条件 where_clauses = ["date_sent >= ?", "is_deleted = FALSE"] params = [cutoff_date_str] if account_id: where_clauses.append("account_id = ?") params.append(account_id) where_sql = " AND ".join(where_clauses) # 查询邮件数据 query = f""" SELECT sender_email, recipients FROM emails WHERE {where_sql} """ cursor = conn.execute(query, params) rows = cursor.fetchall() # 统计发件人和收件人频率 sender_counter = Counter() recipient_counter = Counter() for row in rows: sender = row['sender_email'] if sender: sender_counter[sender.lower()] += 1 # 解析收件人(JSON格式) recipients_json = row['recipients'] if recipients_json: try: recipients = json.loads(recipients_json) except json.JSONDecodeError: recipients = recipients_json def _extract_addresses(value) -> List[str]: """Normalize various recipient structures to plain email strings.""" addresses: List[str] = [] if isinstance(value, str): addresses.extend([addr.strip() for addr in value.split(',') if addr.strip()]) elif isinstance(value, dict): # Common structure: {"email": "..."} or {"address": "..."} or grouped by type email_val = value.get("email") or value.get("address") if email_val: addresses.append(email_val.strip()) else: for nested in value.values(): addresses.extend(_extract_addresses(nested)) elif isinstance(value, list): for item in value: addresses.extend(_extract_addresses(item)) return addresses for recipient in _extract_addresses(recipients): if recipient: recipient_counter[recipient.lower()] += 1 # 准备结果 result = { "analysis_period": { "days": days, "start_date": cutoff_date_str, "end_date": datetime.now().isoformat() }, "account_id": account_id or "all_accounts", "total_emails_analyzed": len(rows), } # 根据 group_by 返回结果 if group_by in ["sender", "both"]: top_senders = sender_counter.most_common(limit) result["top_senders"] = [ { "email": email, "count": count, "percentage": round(count / len(rows) * 100, 2) if rows else 0 } for email, count in top_senders ] if group_by in ["recipient", "both"]: top_recipients = recipient_counter.most_common(limit) result["top_recipients"] = [ { "email": email, "count": count, "percentage": round(count / len(rows) * 100, 2) if rows else 0 } for email, count in top_recipients ] # 添加统计摘要 result["summary"] = { "unique_senders": len(sender_counter), "unique_recipients": len(recipient_counter), "avg_emails_per_sender": round(sum(sender_counter.values()) / len(sender_counter), 2) if sender_counter else 0, "avg_emails_per_recipient": round(sum(recipient_counter.values()) / len(recipient_counter), 2) if recipient_counter else 0 } conn.close() return result except sqlite3.Error as e: logger.error(f"Database error in contact analysis: {e}") return { "error": f"Database error: {str(e)}", "account_id": account_id or "all_accounts" } except Exception as e: logger.error(f"Error analyzing contacts: {e}") return { "error": f"Analysis error: {str(e)}", "account_id": account_id or "all_accounts" } def get_contact_timeline( self, contact_email: str, account_id: Optional[str] = None, days: int = 90 ) -> Dict[str, Any]: """ 获取特定联系人的沟通时间线 Args: contact_email: 联系人邮箱 account_id: 账户ID days: 时间范围 Returns: 时间线数据 """ try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cutoff_date = datetime.now() - timedelta(days=days) cutoff_date_str = cutoff_date.isoformat() where_clauses = [ "date_sent >= ?", "is_deleted = FALSE", "(sender_email = ? OR recipients LIKE ?)" ] params = [cutoff_date_str, contact_email.lower(), f"%{contact_email.lower()}%"] if account_id: where_clauses.append("account_id = ?") params.append(account_id) where_sql = " AND ".join(where_clauses) query = f""" SELECT date_sent, subject, sender_email, is_read FROM emails WHERE {where_sql} ORDER BY date_sent DESC LIMIT 100 """ cursor = conn.execute(query, params) rows = cursor.fetchall() timeline = [] for row in rows: timeline.append({ "date": row['date_sent'], "subject": row['subject'], "direction": "received" if row['sender_email'].lower() == contact_email.lower() else "sent", "is_read": bool(row['is_read']) }) result = { "contact_email": contact_email, "account_id": account_id or "all_accounts", "total_interactions": len(timeline), "timeline": timeline } conn.close() return result except Exception as e: logger.error(f"Error getting contact timeline: {e}") return { "error": f"Timeline error: {str(e)}", "contact_email": contact_email } def analyze_contacts( account_id: Optional[str] = None, days: int = 30, limit: int = 10, group_by: str = "both" ) -> Dict[str, Any]: """ 分析联系人频率(便捷函数) Args: account_id: 账户ID days: 分析天数 limit: Top N group_by: 分组方式 Returns: 分析结果 """ analyzer = ContactAnalyzer() return analyzer.analyze_contacts(account_id, days, limit, group_by) def get_contact_timeline( contact_email: str, account_id: Optional[str] = None, days: int = 90 ) -> Dict[str, Any]: """ 获取联系人时间线(便捷函数) """ analyzer = ContactAnalyzer() return analyzer.get_contact_timeline(contact_email, account_id, days)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server