"""
Smart Search orchestrator service
Coordinates between different search strategies and data sources
"""
import logging
from typing import Dict, List, Optional, Any, Tuple
from enum import Enum
from services.schema_service import SchemaService
from services.sql_service import SQLService
from services.semantic_service import SemanticService
from services.synthesis_service import SynthesisService
from shared.models import QuestionClassification, SearchStrategy, SemanticSearchResult
from shared.exceptions import SmartSearchError
logger = logging.getLogger(__name__)
class SmartSearchService:
"""
Orchestrates intelligent search across SQL and semantic sources
Implements the strategy pattern for different question types
"""
def __init__(
self,
schema_service: SchemaService,
sql_service: SQLService,
semantic_service: SemanticService,
synthesis_service: SynthesisService,
llm_config: Optional[Dict] = None
):
self.schema_service = schema_service
self.sql_service = sql_service
self.semantic_service = semantic_service
self.synthesis_service = synthesis_service
self.llm_config = llm_config or {}
async def search(
self,
question: str,
include_schema: bool = True,
max_sql_queries: int = 3,
max_semantic_results: int = 10
) -> Dict[str, Any]:
"""
Main smart search entry point
Automatically determines the best search strategy
Args:
question: Natural language question
include_schema: Whether to include schema info in response
max_sql_queries: Maximum SQL queries to execute
max_semantic_results: Maximum semantic search results
Returns:
Comprehensive search response
"""
try:
logger.info(f"Smart Search: {question}")
# Step 1: Classify the question
classification = self._classify_question(question)
logger.info(f"Question classified as: {classification.strategy.value}")
# Step 2: Get schema info if needed
schema_info = None
if classification.needs_schema or include_schema:
schema_info = self.schema_service.get_schema_info()
# Step 3: Execute appropriate search strategy
sql_results = []
semantic_results = []
if classification.strategy in [SearchStrategy.SQL_ONLY, SearchStrategy.HYBRID]:
sql_results = await self._execute_sql_strategy(
question, schema_info, max_sql_queries
)
if classification.strategy in [SearchStrategy.SEMANTIC_ONLY, SearchStrategy.HYBRID]:
semantic_results = await self._execute_semantic_strategy(
question, classification, max_semantic_results
)
# Step 4: Synthesize comprehensive response
response = self.synthesis_service.synthesize_response(
question=question,
sql_results=sql_results,
semantic_results=semantic_results,
schema_info=schema_info,
classification=classification
)
return {
'success': True,
'question': question,
'strategy_used': classification.strategy.value,
'response': response,
'metadata': {
'sql_queries_executed': len(sql_results),
'semantic_results_found': len(semantic_results),
'confidence': classification.confidence,
'reasoning': classification.reasoning
}
}
except Exception as e:
logger.error(f"Smart search failed: {e}")
return {
'success': False,
'error': str(e),
'response': f"I encountered an error while searching: {str(e)}"
}
def _classify_question(self, question: str) -> QuestionClassification:
"""
Classify question to determine search strategy
This is a simplified version - could be enhanced with LLM
"""
question_lower = question.lower()
# SQL indicators
sql_indicators = [
'how many', 'count', 'total', 'sum', 'average', 'max', 'min',
'list all', 'show me', 'find all', 'get all',
'between', 'greater than', 'less than', 'equals',
'group by', 'order by', 'where'
]
# Semantic indicators
semantic_indicators = [
'explain', 'what is', 'how does', 'why', 'describe',
'meaning', 'definition', 'concept', 'understand',
'similar to', 'related to', 'about'
]
# Count indicators
sql_score = sum(1 for indicator in sql_indicators if indicator in question_lower)
semantic_score = sum(1 for indicator in semantic_indicators if indicator in question_lower)
# Determine strategy
if sql_score > semantic_score and sql_score > 0:
if semantic_score > 0:
strategy = SearchStrategy.HYBRID
confidence = 0.7
reasoning = "Question contains both SQL and semantic indicators"
else:
strategy = SearchStrategy.SQL_ONLY
confidence = 0.8
reasoning = "Question appears to be asking for specific data"
elif semantic_score > sql_score and semantic_score > 0:
strategy = SearchStrategy.SEMANTIC_ONLY
confidence = 0.8
reasoning = "Question appears to be asking for conceptual information"
else:
# Default to hybrid for ambiguous questions
strategy = SearchStrategy.HYBRID
confidence = 0.5
reasoning = "Question type unclear, using hybrid approach"
# Check if schema info is needed
needs_schema = any(word in question_lower for word in [
'table', 'column', 'field', 'structure', 'schema',
'what data', 'what information', 'available'
])
return QuestionClassification(
strategy=strategy,
confidence=confidence,
reasoning=reasoning,
needs_schema=needs_schema
)
async def _execute_sql_strategy(
self,
question: str,
schema_info,
max_queries: int
) -> List[Dict]:
"""Execute SQL-based search strategy"""
try:
# Find relevant tables
relevant_tables = self.schema_service.find_relevant_tables(question)
if not relevant_tables:
logger.info("No relevant tables found for SQL strategy")
return []
# Get suggested queries
suggested_queries = self.sql_service.get_suggested_queries(
question, schema_info, limit=max_queries
)
sql_results = []
for query in suggested_queries[:max_queries]:
logger.info(f"Executing SQL: {query.sql}")
# Validate before executing
error = self.sql_service.validate_sql(query.sql)
if error:
logger.warning(f"Skipping invalid SQL: {error}")
continue
# Execute the query
result = self.sql_service.execute_safe(query.sql)
if result.success and result.data:
sql_results.extend(result.data)
return sql_results
except Exception as e:
logger.error(f"SQL strategy failed: {e}")
return []
async def _execute_semantic_strategy(
self,
question: str,
classification: QuestionClassification,
max_results: int
) -> List[SemanticSearchResult]:
"""Execute semantic search strategy"""
try:
# Determine search filters based on question
filters = self._extract_search_filters(question)
# Perform semantic search
results = self.semantic_service.search(
query=question,
limit=max_results,
filters=filters
)
logger.info(f"Semantic search returned {len(results)} results")
return results
except Exception as e:
logger.error(f"Semantic strategy failed: {e}")
return []
def _extract_search_filters(self, question: str) -> Optional[Dict[str, Any]]:
"""Extract search filters from question"""
filters = {}
question_lower = question.lower()
# Table-specific filters
table_keywords = ['user', 'product', 'order', 'customer', 'invoice', 'payment']
for keyword in table_keywords:
if keyword in question_lower:
filters['table_filter'] = keyword
break
# Foreign key filters
if any(word in question_lower for word in ['related', 'connected', 'linked']):
filters['fk_filter'] = True
return filters if filters else None
async def get_search_capabilities(self) -> Dict[str, Any]:
"""Get information about search system capabilities"""
try:
# Get stats from all services
schema_stats = {
'tables_available': len(self.schema_service.get_schema_info().tables)
}
semantic_stats = self.semantic_service.get_search_stats()
return {
'sql_search': {
'available': True,
'capabilities': ['data_queries', 'aggregations', 'filtering', 'joins']
},
'semantic_search': {
'available': semantic_stats.get('vector_search_available', False),
'capabilities': ['concept_search', 'similarity_matching', 'text_fallback']
},
'schema_info': schema_stats,
'search_stats': semantic_stats
}
except Exception as e:
logger.error(f"Could not get search capabilities: {e}")
return {
'error': str(e),
'sql_search': {'available': False},
'semantic_search': {'available': False}
}
async def suggest_questions(self, topic: Optional[str] = None) -> List[str]:
"""Suggest example questions based on available data"""
try:
schema_info = self.schema_service.get_schema_info()
suggestions = []
# Generate suggestions based on available tables
for table in schema_info.tables[:5]: # Limit to first 5 tables
table_name = table['table_name']
# Count questions
suggestions.append(f"How many records are in {table_name}?")
# List questions
suggestions.append(f"Show me the latest {table_name} records")
# If it looks like a user/customer table
if any(word in table_name.lower() for word in ['user', 'customer', 'person']):
suggestions.append(f"What information do we store about {table_name}?")
# Add general questions
suggestions.extend([
"What tables are available in the database?",
"Explain the database structure",
"What kind of data can I query?"
])
return suggestions[:10] # Return top 10 suggestions
except Exception as e:
logger.error(f"Could not generate question suggestions: {e}")
return [
"What data is available?",
"Show me some example records",
"How is the data structured?"
]