Skip to main content
Glama
intelligent_retrieval.py22.8 kB
""" Intelligent Data Retrieval Service Implements smart data retrieval with context awareness, semantic search, and RAG (Retrieval-Augmented Generation) capabilities for the Personal Assistant. """ import asyncio import logging from typing import List, Dict, Any, Optional, Tuple, Union from datetime import datetime, timedelta from dataclasses import dataclass import json from .database_interface import DatabaseInterface from .embedding_service import EmbeddingService, get_embedding_service from .models import Project, Todo, CalendarEvent, StatusEntry, PersonalData logger = logging.getLogger(__name__) @dataclass class SearchContext: """Context information for intelligent retrieval""" user_id: str tenant_id: str query: str intent: Optional[str] = None # "project_search", "todo_planning", "status_update", etc. time_scope: Optional[str] = None # "today", "this_week", "this_month" priority_filter: Optional[str] = None # "high", "medium", "low" content_types: List[str] = None # ["projects", "todos", "documents", "events"] max_results: int = 10 similarity_threshold: float = 0.7 @dataclass class RetrievalResult: """Result from intelligent retrieval""" content_type: str item_id: str title: str description: Optional[str] relevance_score: float context_match: Dict[str, Any] metadata: Dict[str, Any] def to_dict(self) -> Dict[str, Any]: return { "content_type": self.content_type, "item_id": self.item_id, "title": self.title, "description": self.description, "relevance_score": self.relevance_score, "context_match": self.context_match, "metadata": self.metadata } class QueryIntentClassifier: """Classifies user queries to determine intent and context""" INTENT_KEYWORDS = { "project_search": ["project", "working on", "building", "developing", "task list"], "todo_planning": ["todo", "tasks", "need to do", "plan", "schedule", "deadline"], "status_update": ["status", "current", "what am i", "progress", "working", "doing"], "calendar_query": ["meeting", "event", "calendar", "schedule", "appointment"], "document_search": ["document", "file", "notes", "wrote", "saved", "remember"], "review": ["review", "summary", "overview", "report", "recap"] } TIME_KEYWORDS = { "today": ["today", "now", "current", "this moment"], "this_week": ["this week", "week", "weekly", "7 days"], "this_month": ["this month", "month", "monthly", "30 days"], "recent": ["recent", "recently", "latest", "new"], "overdue": ["overdue", "late", "past due", "missed"] } PRIORITY_KEYWORDS = { "high": ["urgent", "important", "high priority", "critical", "asap"], "medium": ["medium", "normal", "standard"], "low": ["low priority", "later", "when time permits", "eventually"] } @staticmethod def classify_intent(query: str) -> str: """Classify the intent of a query""" query_lower = query.lower() intent_scores = {} for intent, keywords in QueryIntentClassifier.INTENT_KEYWORDS.items(): score = sum(1 for keyword in keywords if keyword in query_lower) if score > 0: intent_scores[intent] = score if intent_scores: return max(intent_scores.items(), key=lambda x: x[1])[0] return "general_search" @staticmethod def extract_time_scope(query: str) -> Optional[str]: """Extract time scope from query""" query_lower = query.lower() for scope, keywords in QueryIntentClassifier.TIME_KEYWORDS.items(): if any(keyword in query_lower for keyword in keywords): return scope return None @staticmethod def extract_priority_filter(query: str) -> Optional[str]: """Extract priority filter from query""" query_lower = query.lower() for priority, keywords in QueryIntentClassifier.PRIORITY_KEYWORDS.items(): if any(keyword in query_lower for keyword in keywords): return priority return None class ContextualRetriever: """Performs contextual retrieval based on user intent and history""" def __init__(self, db: DatabaseInterface, embedding_service: EmbeddingService): self.db = db self.embedding_service = embedding_service self.intent_classifier = QueryIntentClassifier() async def retrieve(self, context: SearchContext) -> List[RetrievalResult]: """Perform intelligent retrieval based on context""" # Enhance context with intent classification if not provided if not context.intent: context.intent = self.intent_classifier.classify_intent(context.query) if not context.time_scope: context.time_scope = self.intent_classifier.extract_time_scope(context.query) if not context.priority_filter: context.priority_filter = self.intent_classifier.extract_priority_filter(context.query) logger.info(f"Intelligent retrieval - Intent: {context.intent}, Time: {context.time_scope}, Priority: {context.priority_filter}") # Determine content types to search based on intent if not context.content_types: context.content_types = self._determine_content_types(context.intent) # Perform retrieval based on intent results = [] if "projects" in context.content_types: project_results = await self._retrieve_projects(context) results.extend(project_results) if "todos" in context.content_types: todo_results = await self._retrieve_todos(context) results.extend(todo_results) if "events" in context.content_types: event_results = await self._retrieve_events(context) results.extend(event_results) if "documents" in context.content_types: doc_results = await self._retrieve_documents(context) results.extend(doc_results) # Sort by relevance score and apply limit results.sort(key=lambda x: x.relevance_score, reverse=True) return results[:context.max_results] def _determine_content_types(self, intent: str) -> List[str]: """Determine which content types to search based on intent""" intent_mapping = { "project_search": ["projects", "todos"], "todo_planning": ["todos", "projects"], "status_update": ["projects", "todos", "events"], "calendar_query": ["events"], "document_search": ["documents"], "review": ["projects", "todos", "events", "documents"], "general_search": ["projects", "todos", "events", "documents"] } return intent_mapping.get(intent, ["projects", "todos"]) async def _retrieve_projects(self, context: SearchContext) -> List[RetrievalResult]: """Retrieve projects with contextual filtering""" results = [] try: # Get query embedding query_embedding = await self.embedding_service.generate_embedding(context.query) # Perform vector search if available if hasattr(self.db, 'semantic_search_projects'): search_results = await self.db.semantic_search_projects( query_embedding, limit=context.max_results, similarity_threshold=context.similarity_threshold ) for project, similarity in search_results: # Apply contextual filtering if self._matches_context_filters(project, context): result = RetrievalResult( content_type="project", item_id=project.id, title=project.name, description=project.description, relevance_score=similarity, context_match={ "intent_match": context.intent == "project_search", "priority_match": not context.priority_filter or project.priority == context.priority_filter, "time_match": self._check_time_relevance(project.updated_date, context.time_scope) }, metadata={ "priority": project.priority, "status": project.status, "tags": project.tags, "created_date": project.created_date.isoformat(), "updated_date": project.updated_date.isoformat() } ) results.append(result) else: # Fallback to basic search all_projects = await self.db.get_projects(limit=50) filtered_projects = [p for p in all_projects if self._matches_context_filters(p, context)] for project in filtered_projects: # Simple text similarity similarity = self._calculate_text_similarity(context.query, f"{project.name} {project.description}") if similarity > context.similarity_threshold: result = RetrievalResult( content_type="project", item_id=project.id, title=project.name, description=project.description, relevance_score=similarity, context_match={"text_match": True}, metadata={"priority": project.priority, "status": project.status} ) results.append(result) except Exception as e: logger.error(f"Error retrieving projects: {e}") return results async def _retrieve_todos(self, context: SearchContext) -> List[RetrievalResult]: """Retrieve todos with contextual filtering""" results = [] try: # Apply time-based filtering for todos todos = await self.db.get_todos(limit=100) # Filter based on context filtered_todos = [] for todo in todos: if self._matches_todo_context(todo, context): filtered_todos.append(todo) # Generate embeddings and calculate similarity query_embedding = await self.embedding_service.generate_embedding(context.query) for todo in filtered_todos: # Calculate similarity todo_text = f"{todo.title} {todo.description or ''}" todo_embedding = await self.embedding_service.generate_embedding(todo_text) similarity = self.embedding_service.cosine_similarity(query_embedding, todo_embedding) if similarity > context.similarity_threshold: result = RetrievalResult( content_type="todo", item_id=todo.id, title=todo.title, description=todo.description, relevance_score=similarity, context_match={ "priority_match": not context.priority_filter or todo.priority == context.priority_filter, "completion_relevant": self._is_completion_relevant(todo, context), "time_match": self._check_todo_time_relevance(todo, context.time_scope) }, metadata={ "priority": todo.priority, "completed": todo.completed, "due_date": todo.due_date.isoformat() if todo.due_date else None, "project_id": todo.project_id } ) results.append(result) except Exception as e: logger.error(f"Error retrieving todos: {e}") return results async def _retrieve_events(self, context: SearchContext) -> List[RetrievalResult]: """Retrieve calendar events with contextual filtering""" results = [] try: # Determine time range based on context start_date, end_date = self._get_time_range(context.time_scope) events = await self.db.get_calendar_events(start_date, end_date) query_embedding = await self.embedding_service.generate_embedding(context.query) for event in events: event_text = f"{event.title} {event.description or ''}" event_embedding = await self.embedding_service.generate_embedding(event_text) similarity = self.embedding_service.cosine_similarity(query_embedding, event_embedding) if similarity > context.similarity_threshold: result = RetrievalResult( content_type="event", item_id=event.id, title=event.title, description=event.description, relevance_score=similarity, context_match={ "time_relevant": True, "upcoming": event.start_time > datetime.now() }, metadata={ "start_time": event.start_time.isoformat(), "end_time": event.end_time.isoformat(), "location": event.location, "attendees": event.attendees } ) results.append(result) except Exception as e: logger.error(f"Error retrieving events: {e}") return results async def _retrieve_documents(self, context: SearchContext) -> List[RetrievalResult]: """Retrieve documents with contextual filtering""" results = [] try: # Use hybrid search if available (combining text and vector search) query_embedding = await self.embedding_service.generate_embedding(context.query) if hasattr(self.db, 'hybrid_search_documents'): search_results = await self.db.hybrid_search_documents( context.query, query_embedding, limit=context.max_results ) for doc_data in search_results: result = RetrievalResult( content_type="document", item_id=doc_data["id"], title=doc_data["title"], description=doc_data.get("content", "")[:200] + "..." if len(doc_data.get("content", "")) > 200 else doc_data.get("content", ""), relevance_score=doc_data.get("combined_score", 0.0), context_match={ "text_score": doc_data.get("text_score", 0.0), "semantic_score": doc_data.get("semantic_score", 0.0) }, metadata={ "file_path": doc_data.get("file_path"), "mime_type": doc_data.get("mime_type"), "size_bytes": doc_data.get("size_bytes"), "created_date": doc_data.get("created_date"), "updated_date": doc_data.get("updated_date") } ) results.append(result) except Exception as e: logger.error(f"Error retrieving documents: {e}") return results def _matches_context_filters(self, item: Union[Project, Todo], context: SearchContext) -> bool: """Check if item matches contextual filters""" # Priority filter if context.priority_filter and hasattr(item, 'priority') and item.priority != context.priority_filter: return False # Time scope filter if context.time_scope and hasattr(item, 'updated_date'): if not self._check_time_relevance(item.updated_date, context.time_scope): return False return True def _matches_todo_context(self, todo: Todo, context: SearchContext) -> bool: """Check if todo matches specific todo context""" # Don't show completed todos for planning contexts unless specifically asked if context.intent == "todo_planning" and todo.completed: if "completed" not in context.query.lower(): return False # Show only incomplete todos for status updates if context.intent == "status_update" and todo.completed: return False return self._matches_context_filters(todo, context) def _check_time_relevance(self, item_date: datetime, time_scope: Optional[str]) -> bool: """Check if item date is relevant to time scope""" if not time_scope: return True now = datetime.now() if time_scope == "today": return item_date.date() == now.date() elif time_scope == "this_week": week_start = now - timedelta(days=now.weekday()) return item_date >= week_start elif time_scope == "this_month": return item_date.year == now.year and item_date.month == now.month elif time_scope == "recent": return now - item_date <= timedelta(days=7) return True def _check_todo_time_relevance(self, todo: Todo, time_scope: Optional[str]) -> bool: """Check todo time relevance including due dates""" if not time_scope: return True now = datetime.now() if time_scope == "overdue" and todo.due_date: return todo.due_date < now and not todo.completed # Check due date relevance if todo.due_date: return self._check_time_relevance(todo.due_date, time_scope) # Fallback to created/updated date return self._check_time_relevance(todo.updated_date or todo.created_date, time_scope) def _is_completion_relevant(self, todo: Todo, context: SearchContext) -> bool: """Check if todo completion status is relevant to context""" if "completed" in context.query.lower(): return todo.completed elif context.intent in ["todo_planning", "status_update"]: return not todo.completed return True def _get_time_range(self, time_scope: Optional[str]) -> Tuple[Optional[datetime], Optional[datetime]]: """Get time range for date-based queries""" if not time_scope: return None, None now = datetime.now() if time_scope == "today": start = now.replace(hour=0, minute=0, second=0, microsecond=0) end = start + timedelta(days=1) return start, end elif time_scope == "this_week": start = now - timedelta(days=now.weekday()) start = start.replace(hour=0, minute=0, second=0, microsecond=0) end = start + timedelta(days=7) return start, end elif time_scope == "this_month": start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) if now.month == 12: end = start.replace(year=now.year + 1, month=1) else: end = start.replace(month=now.month + 1) return start, end return None, None def _calculate_text_similarity(self, query: str, text: str) -> float: """Simple text similarity calculation""" query_words = set(query.lower().split()) text_words = set(text.lower().split()) if not query_words: return 0.0 intersection = query_words.intersection(text_words) return len(intersection) / len(query_words) class IntelligentRetrievalService: """Main service for intelligent data retrieval""" def __init__(self, db: DatabaseInterface, embedding_service: Optional[EmbeddingService] = None): self.db = db self.embedding_service = embedding_service or get_embedding_service() self.retriever = ContextualRetriever(db, self.embedding_service) async def search(self, user_id: str, tenant_id: str, query: str, **kwargs) -> Dict[str, Any]: """Perform intelligent search with context awareness""" context = SearchContext( user_id=user_id, tenant_id=tenant_id, query=query, intent=kwargs.get('intent'), time_scope=kwargs.get('time_scope'), priority_filter=kwargs.get('priority_filter'), content_types=kwargs.get('content_types'), max_results=kwargs.get('max_results', 10), similarity_threshold=kwargs.get('similarity_threshold', 0.7) ) results = await self.retriever.retrieve(context) return { "query": query, "context": { "intent": context.intent, "time_scope": context.time_scope, "priority_filter": context.priority_filter, "content_types": context.content_types }, "results": [result.to_dict() for result in results], "total_results": len(results), "retrieval_metadata": { "similarity_threshold": context.similarity_threshold, "max_results": context.max_results, "search_timestamp": datetime.now().isoformat() } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/swapnilsurdi/mcp-pa'

If you have feedback or need assistance with the MCP directory API, please join our Discord server