"""
Synthesis service for LLM response generation
"""
import logging
from typing import Dict, List, Optional, Any
from shared.models import SemanticSearchResult, QuestionClassification
from shared.exceptions import SynthesisError
logger = logging.getLogger(__name__)
class SynthesisService:
"""
Service for LLM response synthesis and generation
"""
def __init__(self, llm_config):
self.llm_config = llm_config
def synthesize_response(
self,
question: str,
sql_results: List[Dict],
semantic_results: List[SemanticSearchResult],
schema_info=None,
classification: Optional[QuestionClassification] = None
) -> str:
"""
Synthesize a comprehensive response from multiple data sources
Args:
question: Original question
sql_results: Results from SQL queries
semantic_results: Results from semantic search
schema_info: Database schema information
classification: Question classification
Returns:
Synthesized markdown response
"""
try:
# Build response components
response_parts = []
# Add introduction based on question type
if classification:
response_parts.append(f"*Based on your question, I used a {classification.strategy.value} approach.*\n")
# Add SQL results if available
if sql_results:
response_parts.append("## Database Query Results")
response_parts.append(self._format_sql_results(sql_results))
# Add semantic search results if available
if semantic_results:
response_parts.append("## Related Information")
response_parts.append(self._format_semantic_results(semantic_results))
# Add schema information if relevant
if schema_info and classification and classification.needs_schema:
response_parts.append("## Database Structure")
response_parts.append(self._format_schema_info(schema_info))
# Combine all parts
if response_parts:
full_response = "\n\n".join(response_parts)
return self.clean_markdown(full_response)
else:
return "I couldn't find any relevant information to answer your question."
except Exception as e:
logger.error(f"Response synthesis failed: {e}")
return f"I encountered an error while processing your question: {str(e)}"
def _format_sql_results(self, sql_results: List[Dict]) -> str:
"""Format SQL results for display"""
try:
if not sql_results:
return "No results found."
# If we have numeric results, show them prominently
if len(sql_results) == 1 and len(sql_results[0]) == 1:
value = list(sql_results[0].values())[0]
return f"**Result: {value}**"
# For multiple results, create a table
formatted = []
for i, result in enumerate(sql_results[:10]): # Limit to 10 results
if i == 0:
# Add header
headers = list(result.keys())
formatted.append("| " + " | ".join(headers) + " |")
formatted.append("| " + " | ".join(["---"] * len(headers)) + " |")
# Add row
values = [str(v) if v is not None else "NULL" for v in result.values()]
formatted.append("| " + " | ".join(values) + " |")
if len(sql_results) > 10:
formatted.append(f"\n*({len(sql_results) - 10} more results not shown)*")
return "\n".join(formatted)
except Exception as e:
logger.error(f"Could not format SQL results: {e}")
return f"Found {len(sql_results)} results (formatting error)"
def _format_semantic_results(self, semantic_results: List[SemanticSearchResult]) -> str:
"""Format semantic search results for display"""
try:
if not semantic_results:
return "No related documents found."
formatted = []
for i, result in enumerate(semantic_results[:5]): # Limit to 5 results
content = result.content
if len(content) > 200:
content = content[:200] + "..."
formatted.append(f"**{i+1}.** {content}")
if result.score > 0:
formatted.append(f" *Relevance: {result.score:.2f}*")
return "\n\n".join(formatted)
except Exception as e:
logger.error(f"Could not format semantic results: {e}")
return f"Found {len(semantic_results)} related documents (formatting error)"
def _format_schema_info(self, schema_info) -> str:
"""Format schema information for display"""
try:
parts = []
if hasattr(schema_info, 'summary'):
parts.append(schema_info.summary)
if hasattr(schema_info, 'tables') and schema_info.tables:
table_names = [table['table_name'] for table in schema_info.tables[:10]]
parts.append(f"**Available tables:** {', '.join(table_names)}")
return "\n\n".join(parts) if parts else "Database structure information not available."
except Exception as e:
logger.error(f"Could not format schema info: {e}")
return "Database structure information not available."
def clean_markdown(self, text: str) -> str:
"""
Clean and format markdown output
Extracted from llmDatabaseRouter._clean_markdown_output()
"""
try:
# Ensure ASCII characters
cleaned = text.encode('ascii', 'ignore').decode('ascii')
# Remove excessive whitespace
lines = cleaned.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if line or (cleaned_lines and cleaned_lines[-1]): # Keep non-empty lines and single empty lines
cleaned_lines.append(line)
# Remove trailing empty lines
while cleaned_lines and not cleaned_lines[-1]:
cleaned_lines.pop()
return '\n'.join(cleaned_lines)
except Exception as e:
logger.error(f"Could not clean markdown: {e}")
return text