"""
Smart Search - Main orchestration service for intelligent database queries
"""
import time
import logging
from typing import Dict, List, Optional, Any
from services.schema_service import SchemaService
from services.sql_service import SQLService
from services.semantic_service import SemanticService
from services.synthesis_service import SynthesisService
from shared.models import (
SearchRequest, SearchResponse, QueryStrategy, QuestionClassification,
SQLQuery, SemanticSearchResult
)
from shared.exceptions import MCPError
logger = logging.getLogger(__name__)
class SmartSearch:
"""
Main orchestration service for intelligent database queries
Coordinates between SQL, semantic search, and synthesis services
to provide comprehensive answers to natural language questions.
"""
def __init__(
self,
schema_service: SchemaService,
sql_service: SQLService,
semantic_service: SemanticService,
synthesis_service: SynthesisService
):
self.schema_service = schema_service
self.sql_service = sql_service
self.semantic_service = semantic_service
self.synthesis_service = synthesis_service
def answer(self, question: str, **kwargs) -> SearchResponse:
"""
Main entry point for answering natural language questions
Args:
question: Natural language question
**kwargs: Additional search parameters
Returns:
SearchResponse with comprehensive answer
"""
start_time = time.time()
try:
# Create search request
request = SearchRequest(
question=question,
filters=kwargs.get('filters'),
limit=kwargs.get('limit', 10),
include_sql=kwargs.get('include_sql', True),
include_semantic=kwargs.get('include_semantic', True),
include_schema=kwargs.get('include_schema', True)
)
logger.info(f"Processing question: {question}")
# 1. Classify the question to determine strategy
classification = self._classify_question(request)
# 2. Get schema context
schema_info = None
if classification.needs_schema or classification.needs_sql:
schema_info = self.schema_service.get_schema_info()
# 3. Execute SQL queries if needed
sql_results = []
sql_queries = []
if classification.needs_sql and request.include_sql:
sql_queries, sql_results = self._execute_sql_strategy(request, schema_info)
# 4. Perform semantic search if needed
semantic_results = []
if classification.needs_semantic and request.include_semantic:
semantic_results = self._execute_semantic_strategy(request)
# 5. Synthesize final response
answer_markdown = self.synthesis_service.synthesize_response(
question=request.question,
sql_results=sql_results,
semantic_results=semantic_results,
schema_info=schema_info,
classification=classification
)
processing_time = time.time() - start_time
response = SearchResponse(
success=True,
answer_markdown=answer_markdown,
sources_used={
'sql_queries': len(sql_results),
'documents': len(semantic_results),
'tables': len(self._get_tables_used(sql_queries))
},
strategy=classification.strategy,
sql_queries=sql_queries,
semantic_results=semantic_results,
processing_time=processing_time
)
logger.info(f"Question answered successfully in {processing_time:.2f}s")
return response
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Failed to answer question: {e}")
return SearchResponse(
success=False,
answer_markdown=f"I encountered an error while processing your question: {str(e)}",
sources_used={'sql_queries': 0, 'documents': 0, 'tables': 0},
strategy=QueryStrategy.SQL_ONLY, # Default
sql_queries=[],
semantic_results=[],
processing_time=processing_time,
error=str(e)
)
def _classify_question(self, request: SearchRequest) -> QuestionClassification:
"""
Classify the question to determine the best strategy
Args:
request: Search request
Returns:
QuestionClassification with strategy and reasoning
"""
question = request.question.lower()
# Keywords that suggest SQL queries
sql_keywords = [
'how many', 'count', 'total', 'sum', 'average', 'max', 'min',
'list all', 'show all', 'find all', 'get all',
'between', 'greater than', 'less than', 'equal to',
'group by', 'order by', 'sort by',
'latest', 'recent', 'oldest', 'first', 'last'
]
# Keywords that suggest semantic search
semantic_keywords = [
'about', 'related to', 'similar to', 'like', 'concerning',
'discuss', 'mention', 'talk about', 'describe',
'what is', 'explain', 'tell me about'
]
# Keywords that suggest schema exploration
schema_keywords = [
'what tables', 'what columns', 'database structure',
'schema', 'available data', 'what data do you have'
]
# Count keyword matches
sql_score = sum(1 for keyword in sql_keywords if keyword in question)
semantic_score = sum(1 for keyword in semantic_keywords if keyword in question)
schema_score = sum(1 for keyword in schema_keywords if keyword in question)
# Determine strategy based on scores and question characteristics
if schema_score > 0:
strategy = QueryStrategy.SCHEMA_ONLY
needs_sql = False
needs_semantic = False
needs_schema = True
reasoning = "Question asks about database structure"
elif sql_score > semantic_score and sql_score > 0:
strategy = QueryStrategy.SQL_ONLY
needs_sql = True
needs_semantic = False
needs_schema = True # Need schema for SQL generation
reasoning = f"Question contains SQL-suggestive keywords (score: {sql_score})"
elif semantic_score > sql_score and semantic_score > 0:
strategy = QueryStrategy.SEMANTIC_ONLY
needs_sql = False
needs_semantic = True
needs_schema = False
reasoning = f"Question asks for semantic search (score: {semantic_score})"
elif sql_score > 0 and semantic_score > 0:
strategy = QueryStrategy.HYBRID
needs_sql = True
needs_semantic = True
needs_schema = True
reasoning = "Question could benefit from both SQL and semantic search"
else:
# Default to hybrid approach for complex questions
strategy = QueryStrategy.HYBRID
needs_sql = True
needs_semantic = True
needs_schema = True
reasoning = "General question, using hybrid approach"
# Calculate confidence based on keyword matches
total_score = sql_score + semantic_score + schema_score
confidence = min(0.9, 0.3 + (total_score * 0.15)) # Cap at 90%
return QuestionClassification(
strategy=strategy,
needs_sql=needs_sql,
needs_semantic=needs_semantic,
needs_schema=needs_schema,
confidence=confidence,
reasoning=reasoning
)
def _execute_sql_strategy(self, request: SearchRequest, schema_info) -> tuple[List[SQLQuery], List[Dict]]:
"""
Execute SQL-based search strategy
Returns:
Tuple of (sql_queries, results)
"""
try:
# Get suggested SQL queries
suggested_queries = self.sql_service.get_suggested_queries(
question=request.question,
schema_info=schema_info,
limit=3
)
results = []
successful_queries = []
for query in suggested_queries:
result = self.sql_service.execute_safe(query.sql)
if result.success and result.data:
results.extend(result.data)
successful_queries.append(query)
logger.info(f"SQL query executed: {len(result.data)} rows")
else:
logger.warning(f"SQL query failed: {result.error}")
return successful_queries, results
except Exception as e:
logger.error(f"SQL strategy failed: {e}")
return [], []
def _execute_semantic_strategy(self, request: SearchRequest) -> List[SemanticSearchResult]:
"""
Execute semantic search strategy
Returns:
List of semantic search results
"""
try:
return self.semantic_service.search(
query=request.question,
limit=request.limit,
filters=request.filters
)
except Exception as e:
logger.error(f"Semantic strategy failed: {e}")
return []
def _get_tables_used(self, sql_queries: List[SQLQuery]) -> List[str]:
"""Extract unique table names from SQL queries"""
tables = set()
for query in sql_queries:
# Simple extraction - could be improved with SQL parsing
sql_lower = query.sql.lower()
words = sql_lower.split()
# Look for FROM and JOIN clauses
for i, word in enumerate(words):
if word in ['from', 'join'] and i + 1 < len(words):
table_name = words[i + 1].strip('(),;')
if table_name and not table_name.startswith('('):
tables.add(table_name)
return list(tables)
def get_capabilities(self) -> Dict[str, Any]:
"""Get information about search capabilities"""
try:
schema_info = self.schema_service.get_schema_info()
return {
'tables_available': len(schema_info.tables),
'total_columns': sum(len(table['columns']) for table in schema_info.tables),
'relationships': len(schema_info.relationships),
'vector_search_available': self.semantic_service.is_vector_search_available(),
'supported_strategies': [strategy.value for strategy in QueryStrategy]
}
except Exception as e:
logger.error(f"Could not get capabilities: {e}")
return {
'error': str(e),
'vector_search_available': False,
'supported_strategies': [strategy.value for strategy in QueryStrategy]
}