Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
analysis_tools.py35.3 kB
""" Communication and Behavioral Analysis Tools This module provides comprehensive analysis capabilities for personal data: - Communication pattern analysis - Network analysis and relationship mapping - Content analysis and insights - Productivity pattern detection - Behavioral trend analysis - Privacy-preserving analytics """ import asyncio import json from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Union from dataclasses import dataclass, field from collections import defaultdict, Counter from enum import Enum import pandas as pd import numpy as np import networkx as nx from scipy import stats import structlog from ..models.data_models import ( EmailMessage, EmailThread, SocialMediaPost, CommunicationPattern, TimeRange, InsightReport ) from ..utils.nlp_processor import get_nlp_processor, TextAnalysisResult from ..utils.analytics_engine import get_analytics_engine, TimeSeriesPoint from ..tools.search_tools import get_search_engine, SearchScope from ..config.settings import get_settings logger = structlog.get_logger(__name__) class AnalysisType(str, Enum): """Types of analysis that can be performed""" COMMUNICATION_PATTERNS = "communication_patterns" NETWORK_ANALYSIS = "network_analysis" CONTENT_ANALYSIS = "content_analysis" PRODUCTIVITY_ANALYSIS = "productivity_analysis" BEHAVIORAL_TRENDS = "behavioral_trends" SENTIMENT_ANALYSIS = "sentiment_analysis" class CommunicationDirection(str, Enum): """Direction of communication""" INCOMING = "incoming" OUTGOING = "outgoing" BIDIRECTIONAL = "bidirectional" @dataclass class CommunicationMetrics: """Metrics for communication analysis""" total_messages: int avg_response_time_hours: float response_rate: float initiated_conversations: int received_conversations: int peak_hours: List[int] peak_days: List[int] avg_message_length: float sentiment_distribution: Dict[str, int] @dataclass class ContactAnalysis: """Analysis results for a specific contact""" contact_id: str contact_name: str email_address: Optional[str] relationship_strength: float communication_frequency: float response_patterns: CommunicationMetrics topics: List[Tuple[str, float]] sentiment_trend: List[Tuple[datetime, float]] communication_style: str importance_score: float @dataclass class NetworkNode: """Represents a node in the communication network""" id: str name: str email: Optional[str] centrality_score: float cluster_id: int metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class NetworkEdge: """Represents an edge in the communication network""" source: str target: str weight: float interaction_count: int last_interaction: datetime avg_response_time: float sentiment_score: float @dataclass class NetworkAnalysis: """Results of network analysis""" nodes: List[NetworkNode] edges: List[NetworkEdge] clusters: Dict[int, List[str]] key_connectors: List[str] isolated_contacts: List[str] network_density: float average_clustering: float @dataclass class ProductivityInsight: """Productivity analysis insight""" metric_name: str current_value: float trend_direction: str confidence: float recommendation: str time_period: str @dataclass class BehavioralPattern: """Represents a detected behavioral pattern""" pattern_type: str description: str frequency: str strength: float examples: List[Dict[str, Any]] trend: str detected_at: datetime class CommunicationAnalyzer: """Analyzes communication patterns and relationships""" def __init__(self): self.settings = get_settings() self.nlp_processor = None self.analytics_engine = None self.search_engine = None # Analysis configuration self._min_interactions_for_analysis = 3 self._response_time_threshold_hours = 48 self._strong_relationship_threshold = 0.7 # Privacy settings self._anonymize_contacts = self.settings.privacy.anonymize_logs async def initialize(self): """Initialize analyzer dependencies""" self.nlp_processor = await get_nlp_processor() self.analytics_engine = get_analytics_engine() self.search_engine = await get_search_engine() async def analyze_communication_patterns( self, time_range: TimeRange, contacts: Optional[List[str]] = None, include_sentiment: bool = True ) -> Dict[str, Any]: """Analyze overall communication patterns""" if not self.nlp_processor: await self.initialize() logger.info("Starting communication pattern analysis", time_range=time_range) # Fetch communication data emails = await self._fetch_emails_in_range(time_range) if not emails: return {"error": "No communication data found in the specified time range"} # Analyze patterns results = { "time_range": { "start": time_range.start.isoformat(), "end": time_range.end.isoformat() }, "overview": await self._analyze_communication_overview(emails), "temporal_patterns": await self._analyze_temporal_patterns(emails), "response_patterns": await self._analyze_response_patterns(emails), "contact_analysis": await self._analyze_top_contacts(emails, limit=20) } if include_sentiment: results["sentiment_analysis"] = await self._analyze_communication_sentiment(emails) return results async def analyze_contact_relationship( self, contact_identifier: str, time_range: Optional[TimeRange] = None ) -> ContactAnalysis: """Analyze relationship with a specific contact""" if not time_range: # Default to last 6 months end_date = datetime.now() start_date = end_date - timedelta(days=180) time_range = TimeRange(start=start_date, end=end_date) # Fetch communication with this contact emails = await self._fetch_emails_with_contact(contact_identifier, time_range) if not emails: logger.warning(f"No communication found with contact: {contact_identifier}") return ContactAnalysis( contact_id=contact_identifier, contact_name="Unknown", email_address=contact_identifier if "@" in contact_identifier else None, relationship_strength=0.0, communication_frequency=0.0, response_patterns=CommunicationMetrics( total_messages=0, avg_response_time_hours=0.0, response_rate=0.0, initiated_conversations=0, received_conversations=0, peak_hours=[], peak_days=[], avg_message_length=0.0, sentiment_distribution={} ), topics=[], sentiment_trend=[], communication_style="unknown", importance_score=0.0 ) # Analyze the relationship relationship_strength = self._calculate_relationship_strength(emails) communication_frequency = len(emails) / ((time_range.end - time_range.start).days / 7) # per week response_patterns = await self._analyze_contact_response_patterns(emails) topics = await self._extract_communication_topics(emails) sentiment_trend = await self._analyze_sentiment_trend(emails) communication_style = self._classify_communication_style(emails) importance_score = self._calculate_contact_importance(emails, relationship_strength) return ContactAnalysis( contact_id=contact_identifier, contact_name=self._extract_contact_name(emails), email_address=contact_identifier if "@" in contact_identifier else None, relationship_strength=relationship_strength, communication_frequency=communication_frequency, response_patterns=response_patterns, topics=topics, sentiment_trend=sentiment_trend, communication_style=communication_style, importance_score=importance_score ) async def analyze_communication_network( self, time_range: TimeRange, min_interactions: int = 3 ) -> NetworkAnalysis: """Analyze communication network structure""" emails = await self._fetch_emails_in_range(time_range) if not emails: return NetworkAnalysis([], [], {}, [], [], 0.0, 0.0) # Build network graph G = nx.Graph() # Extract contacts and interactions contact_interactions = defaultdict(lambda: defaultdict(list)) for email in emails: sender = email.get('from', '') recipients = email.get('to', []) # Add interactions for recipient in recipients: if sender != recipient: # Avoid self-loops contact_interactions[sender][recipient].append(email) contact_interactions[recipient][sender].append(email) # Build graph for contact1, interactions in contact_interactions.items(): for contact2, interaction_list in interactions.items(): if len(interaction_list) >= min_interactions: weight = len(interaction_list) avg_response_time = self._calculate_avg_response_time(interaction_list) sentiment_score = await self._calculate_interaction_sentiment(interaction_list) G.add_edge( contact1, contact2, weight=weight, avg_response_time=avg_response_time, sentiment_score=sentiment_score ) # Calculate network metrics nodes = [] edges = [] # Calculate centrality scores if len(G.nodes()) > 0: centrality_scores = nx.degree_centrality(G) clustering_coeffs = nx.clustering(G) # Detect communities try: communities = nx.community.greedy_modularity_communities(G) cluster_mapping = {} for i, community in enumerate(communities): for node in community: cluster_mapping[node] = i except: cluster_mapping = {node: 0 for node in G.nodes()} # Create node objects for node_id in G.nodes(): node = NetworkNode( id=self._anonymize_contact(node_id) if self._anonymize_contacts else node_id, name=self._extract_name_from_email(node_id), email=node_id if "@" in node_id else None, centrality_score=centrality_scores.get(node_id, 0.0), cluster_id=cluster_mapping.get(node_id, 0), metadata={ 'clustering_coefficient': clustering_coeffs.get(node_id, 0.0), 'degree': G.degree(node_id) } ) nodes.append(node) # Create edge objects for edge in G.edges(data=True): source, target, data = edge edge_obj = NetworkEdge( source=self._anonymize_contact(source) if self._anonymize_contacts else source, target=self._anonymize_contact(target) if self._anonymize_contacts else target, weight=data.get('weight', 1.0), interaction_count=data.get('weight', 1), last_interaction=datetime.now(), # Would need to track from actual data avg_response_time=data.get('avg_response_time', 0.0), sentiment_score=data.get('sentiment_score', 0.0) ) edges.append(edge_obj) # Identify key connectors (high centrality) key_connectors = [ node.id for node in sorted(nodes, key=lambda x: x.centrality_score, reverse=True)[:5] ] # Identify isolated contacts (degree = 1 or 0) isolated_contacts = [ node.id for node in nodes if node.metadata.get('degree', 0) <= 1 ] # Group clusters clusters = defaultdict(list) for node in nodes: clusters[node.cluster_id].append(node.id) # Calculate network-level metrics network_density = nx.density(G) average_clustering = nx.average_clustering(G) else: key_connectors = [] isolated_contacts = [] clusters = {} network_density = 0.0 average_clustering = 0.0 return NetworkAnalysis( nodes=nodes, edges=edges, clusters=dict(clusters), key_connectors=key_connectors, isolated_contacts=isolated_contacts, network_density=network_density, average_clustering=average_clustering ) async def analyze_productivity_patterns( self, time_range: TimeRange, metrics: Optional[List[str]] = None ) -> List[ProductivityInsight]: """Analyze productivity patterns and provide insights""" insights = [] # Default metrics to analyze if not metrics: metrics = ['email_volume', 'response_time', 'work_hours', 'focus_time'] # Fetch relevant data emails = await self._fetch_emails_in_range(time_range) for metric in metrics: insight = await self._analyze_productivity_metric(metric, emails, time_range) if insight: insights.append(insight) return insights async def detect_behavioral_patterns( self, time_range: TimeRange, pattern_types: Optional[List[str]] = None ) -> List[BehavioralPattern]: """Detect behavioral patterns in communication and activity""" patterns = [] if not pattern_types: pattern_types = ['response_time', 'communication_volume', 'topic_shifts', 'sentiment_changes'] # Fetch communication data emails = await self._fetch_emails_in_range(time_range) for pattern_type in pattern_types: detected_patterns = await self._detect_pattern_type(pattern_type, emails, time_range) patterns.extend(detected_patterns) return patterns # Helper methods for data fetching async def _fetch_emails_in_range(self, time_range: TimeRange) -> List[Dict[str, Any]]: """Fetch emails within a time range""" # This would integrate with the actual email client # For now, return mock data structure return [] async def _fetch_emails_with_contact( self, contact: str, time_range: TimeRange ) -> List[Dict[str, Any]]: """Fetch emails with a specific contact""" # This would integrate with the actual email client return [] # Analysis helper methods async def _analyze_communication_overview( self, emails: List[Dict[str, Any]] ) -> Dict[str, Any]: """Analyze overall communication statistics""" total_emails = len(emails) sent_emails = len([e for e in emails if e.get('direction') == 'sent']) received_emails = total_emails - sent_emails # Calculate daily averages if emails: date_range = ( max(e.get('date', datetime.now()) for e in emails) - min(e.get('date', datetime.now()) for e in emails) ).days daily_average = total_emails / max(1, date_range) else: daily_average = 0.0 return { "total_emails": total_emails, "sent_emails": sent_emails, "received_emails": received_emails, "daily_average": daily_average, "send_receive_ratio": sent_emails / max(1, received_emails) } async def _analyze_temporal_patterns( self, emails: List[Dict[str, Any]] ) -> Dict[str, Any]: """Analyze temporal patterns in communication""" if not emails: return {} # Extract temporal data hours = [] days_of_week = [] for email in emails: date = email.get('date', datetime.now()) hours.append(date.hour) days_of_week.append(date.weekday()) # Find peaks hour_counts = Counter(hours) dow_counts = Counter(days_of_week) peak_hours = [hour for hour, count in hour_counts.most_common(3)] peak_days = [day for day, count in dow_counts.most_common(3)] return { "peak_hours": peak_hours, "peak_days_of_week": peak_days, "hourly_distribution": dict(hour_counts), "daily_distribution": dict(dow_counts), "total_active_hours": len(set(hours)), "communication_span": { "earliest_hour": min(hours) if hours else 0, "latest_hour": max(hours) if hours else 0 } } async def _analyze_response_patterns( self, emails: List[Dict[str, Any]] ) -> Dict[str, Any]: """Analyze response time patterns""" response_times = [] # Group emails by thread to calculate response times threads = defaultdict(list) for email in emails: thread_id = email.get('thread_id', email.get('id')) threads[thread_id].append(email) for thread_emails in threads.values(): # Sort by date thread_emails.sort(key=lambda x: x.get('date', datetime.now())) for i in range(1, len(thread_emails)): prev_email = thread_emails[i-1] curr_email = thread_emails[i] # Calculate response time if direction changed if (prev_email.get('direction') != curr_email.get('direction')): response_time = ( curr_email.get('date', datetime.now()) - prev_email.get('date', datetime.now()) ).total_seconds() / 3600 # Convert to hours if response_time > 0 and response_time < 168: # Less than a week response_times.append(response_time) if response_times: avg_response_time = np.mean(response_times) median_response_time = np.median(response_times) response_time_std = np.std(response_times) else: avg_response_time = median_response_time = response_time_std = 0.0 return { "average_response_time_hours": avg_response_time, "median_response_time_hours": median_response_time, "response_time_std": response_time_std, "quick_responses": len([rt for rt in response_times if rt < 1]), # < 1 hour "slow_responses": len([rt for rt in response_times if rt > 24]), # > 24 hours "response_rate": len(response_times) / max(1, len(emails) // 2) } async def _analyze_top_contacts( self, emails: List[Dict[str, Any]], limit: int = 10 ) -> List[Dict[str, Any]]: """Analyze top contacts by interaction frequency""" contact_stats = defaultdict(lambda: { 'email_count': 0, 'last_contact': None, 'avg_response_time': 0.0, 'sent_to_them': 0, 'received_from_them': 0 }) for email in emails: # Extract contacts contacts = [] if email.get('direction') == 'sent': contacts = email.get('to', []) else: contacts = [email.get('from', '')] for contact in contacts: if contact: stats = contact_stats[contact] stats['email_count'] += 1 email_date = email.get('date', datetime.now()) if not stats['last_contact'] or email_date > stats['last_contact']: stats['last_contact'] = email_date if email.get('direction') == 'sent': stats['sent_to_them'] += 1 else: stats['received_from_them'] += 1 # Sort by interaction frequency top_contacts = sorted( contact_stats.items(), key=lambda x: x[1]['email_count'], reverse=True )[:limit] # Format results results = [] for contact, stats in top_contacts: contact_display = self._anonymize_contact(contact) if self._anonymize_contacts else contact results.append({ 'contact': contact_display, 'name': self._extract_name_from_email(contact), 'total_emails': stats['email_count'], 'sent_to_them': stats['sent_to_them'], 'received_from_them': stats['received_from_them'], 'last_contact': stats['last_contact'].isoformat() if stats['last_contact'] else None, 'interaction_balance': stats['sent_to_them'] / max(1, stats['received_from_them']) }) return results async def _analyze_communication_sentiment( self, emails: List[Dict[str, Any]] ) -> Dict[str, Any]: """Analyze sentiment in communications""" if not emails: return {} sentiments = [] sentiment_by_contact = defaultdict(list) for email in emails: content = email.get('body', '') or email.get('snippet', '') if content: # Analyze sentiment sentiment_score, sentiment_label = await self.nlp_processor._analyze_sentiment(content) sentiments.append(sentiment_score) # Track by contact contact = email.get('from', '') if email.get('direction') == 'received' else email.get('to', [''])[0] if contact: sentiment_by_contact[contact].append(sentiment_score) if sentiments: avg_sentiment = np.mean(sentiments) sentiment_std = np.std(sentiments) positive_ratio = len([s for s in sentiments if s > 0.1]) / len(sentiments) negative_ratio = len([s for s in sentiments if s < -0.1]) / len(sentiments) else: avg_sentiment = sentiment_std = positive_ratio = negative_ratio = 0.0 # Find most positive and negative contacts contact_sentiments = {} for contact, scores in sentiment_by_contact.items(): if len(scores) >= 3: # Minimum interactions for reliable sentiment contact_sentiments[contact] = np.mean(scores) most_positive = max(contact_sentiments.items(), key=lambda x: x[1]) if contact_sentiments else None most_negative = min(contact_sentiments.items(), key=lambda x: x[1]) if contact_sentiments else None return { "overall_sentiment": avg_sentiment, "sentiment_variability": sentiment_std, "positive_communication_ratio": positive_ratio, "negative_communication_ratio": negative_ratio, "neutral_communication_ratio": 1 - positive_ratio - negative_ratio, "most_positive_contact": most_positive[0] if most_positive else None, "most_negative_contact": most_negative[0] if most_negative else None, "sentiment_trend": [] # Would calculate trend over time } # Additional helper methods def _calculate_relationship_strength(self, emails: List[Dict[str, Any]]) -> float: """Calculate relationship strength based on interaction patterns""" if not emails: return 0.0 # Factors for relationship strength frequency_score = min(1.0, len(emails) / 50.0) # Normalize to max 50 emails # Bidirectional communication sent_count = len([e for e in emails if e.get('direction') == 'sent']) received_count = len(emails) - sent_count balance_score = 1.0 - abs(sent_count - received_count) / len(emails) # Recency if emails: latest_email = max(emails, key=lambda x: x.get('date', datetime.min)) days_since_last = (datetime.now() - latest_email.get('date', datetime.now())).days recency_score = max(0.0, 1.0 - days_since_last / 365.0) # Decay over a year else: recency_score = 0.0 # Combine scores return (frequency_score * 0.4 + balance_score * 0.3 + recency_score * 0.3) def _anonymize_contact(self, contact: str) -> str: """Anonymize contact information for privacy""" if "@" in contact: # Hash email address import hashlib return f"contact_{hashlib.sha256(contact.encode()).hexdigest()[:8]}" else: return f"contact_{hash(contact) % 10000:04d}" def _extract_name_from_email(self, email: str) -> str: """Extract display name from email address""" if "@" in email: local_part = email.split("@")[0] # Clean up common email patterns name = local_part.replace(".", " ").replace("_", " ").replace("-", " ") return name.title() return email def _extract_contact_name(self, emails: List[Dict[str, Any]]) -> str: """Extract the most common name for a contact from emails""" names = [] for email in emails: from_name = email.get('from_name', '') if from_name: names.append(from_name) if names: # Return most common name return Counter(names).most_common(1)[0][0] else: # Fallback to extracting from email address if emails: email_addr = emails[0].get('from', '') or emails[0].get('to', [''])[0] return self._extract_name_from_email(email_addr) return "Unknown" async def _analyze_contact_response_patterns( self, emails: List[Dict[str, Any]] ) -> CommunicationMetrics: """Analyze response patterns for a specific contact""" # Implementation would analyze response times, frequency, etc. # This is a simplified version return CommunicationMetrics( total_messages=len(emails), avg_response_time_hours=12.0, # Placeholder response_rate=0.8, initiated_conversations=5, received_conversations=8, peak_hours=[9, 14, 16], peak_days=[1, 2, 3], # Monday, Tuesday, Wednesday avg_message_length=150.0, sentiment_distribution={'positive': 5, 'neutral': 8, 'negative': 2} ) async def _extract_communication_topics( self, emails: List[Dict[str, Any]] ) -> List[Tuple[str, float]]: """Extract main topics from communication with a contact""" all_content = [] for email in emails: content = email.get('body', '') or email.get('snippet', '') if content: all_content.append(content) if all_content: combined_content = ' '.join(all_content) keywords = await self.nlp_processor._extract_keywords(combined_content, max_keywords=5) return keywords return [] async def _analyze_sentiment_trend( self, emails: List[Dict[str, Any]] ) -> List[Tuple[datetime, float]]: """Analyze sentiment trend over time""" sentiment_data = [] for email in emails: content = email.get('body', '') or email.get('snippet', '') if content: sentiment_score, _ = await self.nlp_processor._analyze_sentiment(content) date = email.get('date', datetime.now()) sentiment_data.append((date, sentiment_score)) # Sort by date sentiment_data.sort(key=lambda x: x[0]) return sentiment_data def _classify_communication_style(self, emails: List[Dict[str, Any]]) -> str: """Classify communication style based on content analysis""" # This would analyze language patterns, formality, etc. # Simplified implementation if len(emails) < 3: return "insufficient_data" # Analyze average message length lengths = [] for email in emails: content = email.get('body', '') or email.get('snippet', '') lengths.append(len(content.split()) if content else 0) if lengths: avg_length = np.mean(lengths) if avg_length > 100: return "formal" elif avg_length > 50: return "conversational" else: return "brief" return "unknown" def _calculate_contact_importance( self, emails: List[Dict[str, Any]], relationship_strength: float ) -> float: """Calculate importance score for a contact""" # Combine various factors frequency_factor = min(1.0, len(emails) / 20.0) relationship_factor = relationship_strength # Check for business indicators business_keywords = ['meeting', 'project', 'deadline', 'client', 'proposal'] business_score = 0.0 for email in emails: content = (email.get('body', '') + ' ' + email.get('subject', '')).lower() business_mentions = sum(1 for keyword in business_keywords if keyword in content) business_score += business_mentions business_factor = min(1.0, business_score / (len(emails) * len(business_keywords))) return (frequency_factor * 0.4 + relationship_factor * 0.4 + business_factor * 0.2) def _calculate_avg_response_time(self, interactions: List[Dict[str, Any]]) -> float: """Calculate average response time for interactions""" # Simplified implementation return 12.0 # 12 hours average async def _calculate_interaction_sentiment(self, interactions: List[Dict[str, Any]]) -> float: """Calculate average sentiment for interactions""" sentiments = [] for interaction in interactions: content = interaction.get('body', '') or interaction.get('snippet', '') if content: sentiment_score, _ = await self.nlp_processor._analyze_sentiment(content) sentiments.append(sentiment_score) return np.mean(sentiments) if sentiments else 0.0 async def _analyze_productivity_metric( self, metric: str, emails: List[Dict[str, Any]], time_range: TimeRange ) -> Optional[ProductivityInsight]: """Analyze a specific productivity metric""" # This would implement specific productivity metrics # Placeholder implementation return ProductivityInsight( metric_name=metric, current_value=75.0, trend_direction="stable", confidence=0.7, recommendation=f"Your {metric} is within normal range", time_period=f"{time_range.start.date()} to {time_range.end.date()}" ) async def _detect_pattern_type( self, pattern_type: str, emails: List[Dict[str, Any]], time_range: TimeRange ) -> List[BehavioralPattern]: """Detect specific type of behavioral pattern""" # Placeholder implementation return [ BehavioralPattern( pattern_type=pattern_type, description=f"Detected {pattern_type} pattern", frequency="weekly", strength=0.6, examples=[], trend="stable", detected_at=datetime.now() ) ] # Global analyzer instance _communication_analyzer: Optional[CommunicationAnalyzer] = None async def get_communication_analyzer() -> CommunicationAnalyzer: """Get the global communication analyzer instance""" global _communication_analyzer if _communication_analyzer is None: _communication_analyzer = CommunicationAnalyzer() await _communication_analyzer.initialize() return _communication_analyzer async def quick_communication_analysis( time_range: TimeRange, analysis_types: Optional[List[AnalysisType]] = None ) -> Dict[str, Any]: """Quick communication analysis for specified time range""" analyzer = await get_communication_analyzer() if not analysis_types: analysis_types = [AnalysisType.COMMUNICATION_PATTERNS] results = {} for analysis_type in analysis_types: if analysis_type == AnalysisType.COMMUNICATION_PATTERNS: results['communication_patterns'] = await analyzer.analyze_communication_patterns(time_range) elif analysis_type == AnalysisType.NETWORK_ANALYSIS: results['network_analysis'] = await analyzer.analyze_communication_network(time_range) elif analysis_type == AnalysisType.PRODUCTIVITY_ANALYSIS: results['productivity_analysis'] = await analyzer.analyze_productivity_patterns(time_range) elif analysis_type == AnalysisType.BEHAVIORAL_TRENDS: results['behavioral_patterns'] = await analyzer.detect_behavioral_patterns(time_range) return results async def analyze_contact_relationship( contact_identifier: str, time_range: Optional[TimeRange] = None ) -> ContactAnalysis: """Analyze relationship with a specific contact""" analyzer = await get_communication_analyzer() return await analyzer.analyze_contact_relationship(contact_identifier, time_range)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server