Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
search_tools.py33.6 kB
""" Cross-Platform Search and Retrieval Tools This module provides comprehensive search capabilities across multiple platforms: - Universal search across Gmail, Drive, and social media - Contextual search with relationship mapping - Smart filtering and ranking - Privacy-aware search with data minimization - Real-time and cached search results """ import asyncio import json import re from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Union from dataclasses import dataclass, field from enum import Enum import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import structlog from ..models.data_models import ( EmailMessage, EmailThread, SocialMediaPost, Project, TimeRange, DataFilter, QueryParams ) from ..integrations.client_manager import get_client_manager from ..utils.nlp_processor import get_nlp_processor, TextAnalysisResult from ..utils.cache import get_cache_manager from ..config.settings import get_settings logger = structlog.get_logger(__name__) class SearchScope(str, Enum): """Search scope options""" ALL = "all" EMAIL = "email" DRIVE = "drive" SOCIAL = "social" DOCUMENTS = "documents" CACHED = "cached" class SortOrder(str, Enum): """Search result sorting options""" RELEVANCE = "relevance" DATE_DESC = "date_desc" DATE_ASC = "date_asc" IMPORTANCE = "importance" ENGAGEMENT = "engagement" class SearchResultType(str, Enum): """Types of search results""" EMAIL = "email" EMAIL_THREAD = "email_thread" DOCUMENT = "document" SOCIAL_POST = "social_post" PROJECT = "project" CONTACT = "contact" @dataclass class SearchFilter: """Advanced search filter""" field: str operator: str # eq, ne, contains, startswith, endswith, gt, lt, gte, lte, in value: Any weight: float = 1.0 @dataclass class SearchResult: """Universal search result""" id: str type: SearchResultType title: str snippet: str content: str metadata: Dict[str, Any] relevance_score: float timestamp: datetime source: str url: Optional[str] = None tags: List[str] = field(default_factory=list) @dataclass class SearchResults: """Collection of search results with metadata""" results: List[SearchResult] total_count: int search_time_ms: float query: str filters: List[SearchFilter] facets: Dict[str, Dict[str, int]] suggestions: List[str] page: int page_size: int @dataclass class ContextualSearchResult: """Search result with contextual relationships""" primary_result: SearchResult related_results: List[SearchResult] relationship_type: str relationship_strength: float context_summary: str class UniversalSearchEngine: """Main search engine for cross-platform content""" def __init__(self): self.settings = get_settings() self.client_manager = None self.nlp_processor = None self.cache_manager = None # Search configuration self._max_results_per_source = 100 self._search_timeout_seconds = 30 self._enable_content_caching = True self._privacy_mode = self.settings.privacy.anonymize_logs # TF-IDF vectorizer for content similarity self._vectorizer = TfidfVectorizer( max_features=5000, stop_words='english', ngram_range=(1, 2), max_df=0.8, min_df=2 ) self._content_vectors = {} self._indexed_content = [] async def initialize(self): """Initialize search engine dependencies""" self.client_manager = await get_client_manager() self.nlp_processor = await get_nlp_processor() self.cache_manager = get_cache_manager() async def universal_search( self, query: str, scope: SearchScope = SearchScope.ALL, filters: Optional[List[SearchFilter]] = None, sort_by: SortOrder = SortOrder.RELEVANCE, page: int = 1, page_size: int = 20, include_facets: bool = True ) -> SearchResults: """Perform universal search across all platforms""" if not self.client_manager: await self.initialize() start_time = datetime.now() filters = filters or [] logger.info(f"Starting universal search", query=query, scope=scope.value) # Determine which sources to search sources_to_search = self._get_search_sources(scope) # Perform concurrent searches across sources search_tasks = [] for source in sources_to_search: task = self._search_source(source, query, filters) search_tasks.append(task) # Collect results from all sources source_results = await asyncio.gather(*search_tasks, return_exceptions=True) # Combine and process results all_results = [] for i, result in enumerate(source_results): if isinstance(result, Exception): logger.warning(f"Search failed for source {sources_to_search[i]}: {result}") continue if isinstance(result, list): all_results.extend(result) # Apply additional filtering filtered_results = self._apply_filters(all_results, filters) # Calculate relevance scores scored_results = await self._calculate_relevance_scores(filtered_results, query) # Sort results sorted_results = self._sort_results(scored_results, sort_by) # Generate facets facets = {} if include_facets: facets = self._generate_facets(sorted_results) # Pagination start_idx = (page - 1) * page_size end_idx = start_idx + page_size paginated_results = sorted_results[start_idx:end_idx] # Generate search suggestions suggestions = await self._generate_search_suggestions(query, sorted_results) # Calculate search time search_time = (datetime.now() - start_time).total_seconds() * 1000 return SearchResults( results=paginated_results, total_count=len(sorted_results), search_time_ms=search_time, query=query, filters=filters, facets=facets, suggestions=suggestions, page=page, page_size=page_size ) def _get_search_sources(self, scope: SearchScope) -> List[str]: """Determine which sources to search based on scope""" if scope == SearchScope.ALL: return ['email', 'drive', 'social', 'cached'] elif scope == SearchScope.EMAIL: return ['email'] elif scope == SearchScope.DRIVE: return ['drive'] elif scope == SearchScope.SOCIAL: return ['social'] elif scope == SearchScope.DOCUMENTS: return ['drive', 'cached'] elif scope == SearchScope.CACHED: return ['cached'] else: return ['cached'] async def _search_source( self, source: str, query: str, filters: List[SearchFilter] ) -> List[SearchResult]: """Search a specific source""" try: if source == 'email': return await self._search_emails(query, filters) elif source == 'drive': return await self._search_drive(query, filters) elif source == 'social': return await self._search_social(query, filters) elif source == 'cached': return await self._search_cached(query, filters) else: logger.warning(f"Unknown search source: {source}") return [] except Exception as e: logger.error(f"Error searching {source}: {e}") return [] async def _search_emails( self, query: str, filters: List[SearchFilter] ) -> List[SearchResult]: """Search Gmail messages""" results = [] try: gmail_client = await self.client_manager.get_client('gmail') if not gmail_client: logger.warning("Gmail client not available") return results # Build Gmail search query gmail_query = f"in:anywhere {query}" # Add filter-based query modifications for filter_obj in filters: if filter_obj.field == 'from' and filter_obj.operator == 'eq': gmail_query += f" from:{filter_obj.value}" elif filter_obj.field == 'subject' and filter_obj.operator == 'contains': gmail_query += f" subject:{filter_obj.value}" elif filter_obj.field == 'date' and filter_obj.operator == 'gte': if isinstance(filter_obj.value, datetime): date_str = filter_obj.value.strftime('%Y/%m/%d') gmail_query += f" after:{date_str}" # Search Gmail messages = await gmail_client.search_messages( query=gmail_query, max_results=self._max_results_per_source ) # Convert to SearchResult objects for message in messages: if isinstance(message, dict): result = SearchResult( id=message.get('id', ''), type=SearchResultType.EMAIL, title=message.get('subject', 'No Subject'), snippet=message.get('snippet', ''), content=message.get('body', ''), metadata={ 'from': message.get('from', ''), 'to': message.get('to', []), 'labels': message.get('labels', []), 'thread_id': message.get('thread_id', '') }, relevance_score=0.0, # Will be calculated later timestamp=message.get('date', datetime.now()), source='gmail', url=f"https://mail.google.com/mail/u/0/#inbox/{message.get('id', '')}" ) results.append(result) except Exception as e: logger.error(f"Gmail search failed: {e}") return results async def _search_drive( self, query: str, filters: List[SearchFilter] ) -> List[SearchResult]: """Search Google Drive files""" results = [] try: drive_client = await self.client_manager.get_client('drive') if not drive_client: logger.warning("Drive client not available") return results # Build Drive search query drive_query = f"fullText contains '{query}'" # Add filters for filter_obj in filters: if filter_obj.field == 'file_type' and filter_obj.operator == 'eq': drive_query += f" and mimeType = '{filter_obj.value}'" elif filter_obj.field == 'modified' and filter_obj.operator == 'gte': if isinstance(filter_obj.value, datetime): date_str = filter_obj.value.isoformat() drive_query += f" and modifiedTime >= '{date_str}'" # Search Drive files = await drive_client.search_files( query=drive_query, max_results=self._max_results_per_source ) # Convert to SearchResult objects for file_info in files: if isinstance(file_info, dict): result = SearchResult( id=file_info.get('id', ''), type=SearchResultType.DOCUMENT, title=file_info.get('name', 'Untitled'), snippet=file_info.get('description', '')[:200], content='', # Would need to fetch content separately metadata={ 'mime_type': file_info.get('mimeType', ''), 'size': file_info.get('size', 0), 'owners': file_info.get('owners', []), 'shared': file_info.get('shared', False) }, relevance_score=0.0, timestamp=file_info.get('modifiedTime', datetime.now()), source='drive', url=file_info.get('webViewLink', '') ) results.append(result) except Exception as e: logger.error(f"Drive search failed: {e}") return results async def _search_social( self, query: str, filters: List[SearchFilter] ) -> List[SearchResult]: """Search social media posts""" results = [] try: # Search Twitter twitter_client = await self.client_manager.get_client('twitter') if twitter_client: tweets = await twitter_client.search_tweets( query=query, max_results=self._max_results_per_source // 2 ) for tweet in tweets: if isinstance(tweet, dict): result = SearchResult( id=tweet.get('id', ''), type=SearchResultType.SOCIAL_POST, title=f"Tweet from {tweet.get('author', 'Unknown')}", snippet=tweet.get('text', '')[:200], content=tweet.get('text', ''), metadata={ 'platform': 'twitter', 'author': tweet.get('author', ''), 'likes': tweet.get('public_metrics', {}).get('like_count', 0), 'retweets': tweet.get('public_metrics', {}).get('retweet_count', 0) }, relevance_score=0.0, timestamp=tweet.get('created_at', datetime.now()), source='twitter', url=tweet.get('url', '') ) results.append(result) # Search LinkedIn linkedin_client = await self.client_manager.get_client('linkedin') if linkedin_client: posts = await linkedin_client.search_posts( query=query, max_results=self._max_results_per_source // 2 ) for post in posts: if isinstance(post, dict): result = SearchResult( id=post.get('id', ''), type=SearchResultType.SOCIAL_POST, title=f"LinkedIn post from {post.get('author', 'Unknown')}", snippet=post.get('text', '')[:200], content=post.get('text', ''), metadata={ 'platform': 'linkedin', 'author': post.get('author', ''), 'likes': post.get('likes', 0), 'comments': post.get('comments', 0) }, relevance_score=0.0, timestamp=post.get('created_at', datetime.now()), source='linkedin', url=post.get('url', '') ) results.append(result) except Exception as e: logger.error(f"Social media search failed: {e}") return results async def _search_cached( self, query: str, filters: List[SearchFilter] ) -> List[SearchResult]: """Search cached content""" results = [] try: # Search cached content cached_items = await self.cache_manager.search_cache(query) for item in cached_items: if isinstance(item, dict): result = SearchResult( id=item.get('id', ''), type=SearchResultType.DOCUMENT, title=item.get('title', 'Cached Item'), snippet=item.get('snippet', ''), content=item.get('content', ''), metadata=item.get('metadata', {}), relevance_score=0.0, timestamp=item.get('timestamp', datetime.now()), source='cache', url=item.get('url', '') ) results.append(result) except Exception as e: logger.error(f"Cache search failed: {e}") return results def _apply_filters( self, results: List[SearchResult], filters: List[SearchFilter] ) -> List[SearchResult]: """Apply additional filters to search results""" filtered_results = results for filter_obj in filters: filtered_results = self._apply_single_filter(filtered_results, filter_obj) return filtered_results def _apply_single_filter( self, results: List[SearchResult], filter_obj: SearchFilter ) -> List[SearchResult]: """Apply a single filter to results""" filtered = [] for result in results: value = self._get_filter_value(result, filter_obj.field) if self._filter_matches(value, filter_obj.operator, filter_obj.value): filtered.append(result) return filtered def _get_filter_value(self, result: SearchResult, field: str) -> Any: """Get the value for a filter field from a search result""" if field == 'type': return result.type.value elif field == 'source': return result.source elif field == 'timestamp': return result.timestamp elif field in result.metadata: return result.metadata[field] else: return None def _filter_matches(self, value: Any, operator: str, filter_value: Any) -> bool: """Check if a value matches a filter condition""" if value is None: return False try: if operator == 'eq': return value == filter_value elif operator == 'ne': return value != filter_value elif operator == 'contains': return str(filter_value).lower() in str(value).lower() elif operator == 'startswith': return str(value).lower().startswith(str(filter_value).lower()) elif operator == 'endswith': return str(value).lower().endswith(str(filter_value).lower()) elif operator == 'gt': return value > filter_value elif operator == 'lt': return value < filter_value elif operator == 'gte': return value >= filter_value elif operator == 'lte': return value <= filter_value elif operator == 'in': return value in filter_value else: return False except Exception: return False async def _calculate_relevance_scores( self, results: List[SearchResult], query: str ) -> List[SearchResult]: """Calculate relevance scores for search results""" if not results: return results # Prepare documents for TF-IDF documents = [] for result in results: # Combine title, snippet, and content for scoring doc_text = f"{result.title} {result.snippet} {result.content}" documents.append(doc_text) # Add query as first document for comparison documents.insert(0, query) try: # Calculate TF-IDF vectors tfidf_matrix = self._vectorizer.fit_transform(documents) # Calculate cosine similarity between query and each document query_vector = tfidf_matrix[0:1] doc_vectors = tfidf_matrix[1:] similarities = cosine_similarity(query_vector, doc_vectors).flatten() # Update relevance scores for i, result in enumerate(results): base_score = similarities[i] # Apply boosting factors boost_factor = 1.0 # Boost recent content days_old = (datetime.now() - result.timestamp).days if days_old < 7: boost_factor *= 1.2 elif days_old < 30: boost_factor *= 1.1 # Boost based on source authority if result.source == 'gmail': boost_factor *= 1.1 elif result.source == 'drive': boost_factor *= 1.05 # Boost based on engagement (for social posts) if result.type == SearchResultType.SOCIAL_POST: likes = result.metadata.get('likes', 0) shares = result.metadata.get('retweets', 0) + result.metadata.get('comments', 0) if likes + shares > 10: boost_factor *= 1.15 result.relevance_score = base_score * boost_factor except Exception as e: logger.warning(f"Failed to calculate relevance scores: {e}") # Fallback: assign equal scores for result in results: result.relevance_score = 0.5 return results def _sort_results( self, results: List[SearchResult], sort_by: SortOrder ) -> List[SearchResult]: """Sort search results""" if sort_by == SortOrder.RELEVANCE: return sorted(results, key=lambda x: x.relevance_score, reverse=True) elif sort_by == SortOrder.DATE_DESC: return sorted(results, key=lambda x: x.timestamp, reverse=True) elif sort_by == SortOrder.DATE_ASC: return sorted(results, key=lambda x: x.timestamp) elif sort_by == SortOrder.IMPORTANCE: # Custom importance scoring return sorted(results, key=self._calculate_importance_score, reverse=True) elif sort_by == SortOrder.ENGAGEMENT: return sorted(results, key=self._calculate_engagement_score, reverse=True) else: return results def _calculate_importance_score(self, result: SearchResult) -> float: """Calculate importance score for a result""" score = result.relevance_score # Boost emails marked as important if result.type == SearchResultType.EMAIL: if 'IMPORTANT' in result.metadata.get('labels', []): score *= 1.5 # Boost starred content if 'starred' in result.metadata and result.metadata['starred']: score *= 1.3 return score def _calculate_engagement_score(self, result: SearchResult) -> float: """Calculate engagement score for social media posts""" if result.type != SearchResultType.SOCIAL_POST: return result.relevance_score likes = result.metadata.get('likes', 0) shares = result.metadata.get('retweets', 0) + result.metadata.get('comments', 0) # Normalize engagement metrics engagement_score = (likes + shares * 2) / 100.0 # Arbitrary normalization return result.relevance_score + engagement_score def _generate_facets(self, results: List[SearchResult]) -> Dict[str, Dict[str, int]]: """Generate facets for search results""" facets = { 'type': {}, 'source': {}, 'date_range': {} } for result in results: # Type facets type_val = result.type.value facets['type'][type_val] = facets['type'].get(type_val, 0) + 1 # Source facets source_val = result.source facets['source'][source_val] = facets['source'].get(source_val, 0) + 1 # Date range facets days_old = (datetime.now() - result.timestamp).days if days_old < 1: date_range = 'today' elif days_old < 7: date_range = 'this_week' elif days_old < 30: date_range = 'this_month' elif days_old < 365: date_range = 'this_year' else: date_range = 'older' facets['date_range'][date_range] = facets['date_range'].get(date_range, 0) + 1 return facets async def _generate_search_suggestions( self, query: str, results: List[SearchResult] ) -> List[str]: """Generate search suggestions based on results""" suggestions = [] try: # Extract common terms from high-scoring results top_results = sorted(results, key=lambda x: x.relevance_score, reverse=True)[:10] # Analyze content for keywords content_texts = [f"{r.title} {r.snippet}" for r in top_results] if content_texts: keywords = await self.nlp_processor._extract_keywords(' '.join(content_texts), max_keywords=5) for keyword, score in keywords: if keyword.lower() not in query.lower(): suggestions.append(f"{query} {keyword}") # Add common refinements if 'email' in [r.type.value for r in results]: suggestions.append(f"{query} from:important") suggestions.append(f"{query} has:attachment") if 'social_post' in [r.type.value for r in results]: suggestions.append(f"{query} engagement:high") except Exception as e: logger.warning(f"Failed to generate suggestions: {e}") return suggestions[:5] # Return top 5 suggestions async def contextual_search( self, query: str, context_items: List[str], relationship_types: Optional[List[str]] = None ) -> List[ContextualSearchResult]: """Perform contextual search with relationship mapping""" # First, perform regular search search_results = await self.universal_search(query) # Then find related content for each result contextual_results = [] for result in search_results.results[:10]: # Limit to top 10 for performance related_results = await self._find_related_content(result, context_items) if related_results: context_summary = await self._generate_context_summary(result, related_results) contextual_result = ContextualSearchResult( primary_result=result, related_results=related_results, relationship_type="content_similarity", relationship_strength=0.8, # Would be calculated based on actual relationships context_summary=context_summary ) contextual_results.append(contextual_result) return contextual_results async def _find_related_content( self, primary_result: SearchResult, context_items: List[str] ) -> List[SearchResult]: """Find content related to a primary search result""" related_results = [] # Extract key terms from primary result key_terms = await self.nlp_processor._extract_keywords( f"{primary_result.title} {primary_result.content}", max_keywords=5 ) # Search for content containing these key terms for term, score in key_terms: related_search = await self.universal_search( term, page_size=5 ) # Filter out the primary result itself for related_result in related_search.results: if related_result.id != primary_result.id: related_results.append(related_result) # Remove duplicates and limit results seen_ids = set() unique_related = [] for result in related_results: if result.id not in seen_ids: seen_ids.add(result.id) unique_related.append(result) if len(unique_related) >= 5: break return unique_related async def _generate_context_summary( self, primary_result: SearchResult, related_results: List[SearchResult] ) -> str: """Generate a summary of the contextual relationships""" if not related_results: return "No related content found." # Simple summary generation source_counts = {} for result in related_results: source_counts[result.source] = source_counts.get(result.source, 0) + 1 summary_parts = [] summary_parts.append(f"Found {len(related_results)} related items") for source, count in source_counts.items(): summary_parts.append(f"{count} from {source}") return ", ".join(summary_parts) async def smart_filter_suggestions( self, query: str, current_results: List[SearchResult] ) -> List[SearchFilter]: """Generate smart filter suggestions based on current results""" suggestions = [] # Analyze current results for filter opportunities sources = set(r.source for r in current_results) types = set(r.type.value for r in current_results) # Suggest source filters if multiple sources if len(sources) > 1: for source in sources: count = len([r for r in current_results if r.source == source]) suggestions.append(SearchFilter( field='source', operator='eq', value=source, weight=count / len(current_results) )) # Suggest type filters if multiple types if len(types) > 1: for type_val in types: count = len([r for r in current_results if r.type.value == type_val]) suggestions.append(SearchFilter( field='type', operator='eq', value=type_val, weight=count / len(current_results) )) # Suggest date filters now = datetime.now() recent_count = len([r for r in current_results if (now - r.timestamp).days < 7]) if recent_count > 0: suggestions.append(SearchFilter( field='timestamp', operator='gte', value=now - timedelta(days=7), weight=recent_count / len(current_results) )) return sorted(suggestions, key=lambda x: x.weight, reverse=True)[:5] # Global search engine instance _search_engine: Optional[UniversalSearchEngine] = None async def get_search_engine() -> UniversalSearchEngine: """Get the global search engine instance""" global _search_engine if _search_engine is None: _search_engine = UniversalSearchEngine() await _search_engine.initialize() return _search_engine async def quick_search( query: str, scope: SearchScope = SearchScope.ALL, max_results: int = 10 ) -> List[SearchResult]: """Quick search function for simple queries""" engine = await get_search_engine() results = await engine.universal_search(query, scope=scope, page_size=max_results) return results.results async def search_with_context( query: str, context_items: List[str] ) -> List[ContextualSearchResult]: """Search with contextual relationships""" engine = await get_search_engine() return await engine.contextual_search(query, context_items)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server