query_processor.pyā¢34.1 kB
import re
import json
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
from src.services.real_model_manager import real_model_manager as model_manager
from src.services.tool_registry import mongodb_tool_registry
# Optional hybrid intent classification (feature flag controlled)
try:
from src.services.hybrid_intent_classification import HybridIntentClassificationService, HybridConfig
HYBRID_INTENT_AVAILABLE = True
except ImportError:
HYBRID_INTENT_AVAILABLE = False
logger = logging.getLogger(__name__)
class QueryProcessor:
"""Processes natural language queries and executes appropriate tools"""
def __init__(self):
self._token_usage = None # Track token usage for current query
# Initialize hybrid intent classification (optional, non-breaking)
self.hybrid_intent_service = None
if HYBRID_INTENT_AVAILABLE:
try:
# Start with hybrid disabled (safe)
config = HybridConfig(enabled=False) # Feature flag OFF by default
self.hybrid_intent_service = HybridIntentClassificationService(self, config)
logger.info("Hybrid intent classification available (disabled by default)")
except Exception as e:
logger.warning(f"Hybrid intent classification initialization failed: {e}")
self.hybrid_intent_service = None
self.intent_patterns = {
"greeting": [
r"^(hi|hello|hey|good morning|good afternoon|good evening)(\s|$|,|!)",
r"how are you|how's it going|what's up",
r"^(greetings|salutations)(\s|$|,|!)",
r"nice to meet you|pleasure to meet you"
],
"general_conversation": [
r"^(thank you|thanks|appreciate|grateful)",
r"^(yes|no|okay|ok|sure|alright)(\s|$|,|!)",
r"can you help|need help|assistance",
r"what can you do|what are you|who are you"
],
"sales_inquiry": [
r"sales?|sold|revenue|earnings?|income",
r"how (much|many).*(sold|sales?|revenue)",
r"total (sales?|revenue|earnings?)"
],
"product_inquiry": [
r"how (many|much) products?",
r"products?.*(have|total|count)",
r"(list|show|get).*(products?)",
r"products?.*(price|cost|expensive|cheap)",
r"highest.*price|most expensive|cheapest",
r"product.*(info|details|data|catalog)"
],
"inventory_inquiry": [
r"inventory|stock|out of stock|low stock",
r"how (much|many).*(inventory|stock|available)",
r"(low|out).*(stock|inventory)"
],
"customer_inquiry": [
r"customers?|clients?|buyers?",
r"top customers?|best customers?",
r"customer.*(info|details|data)"
],
"order_inquiry": [
r"orders?|purchases?",
r"pending|shipped|fulfilled",
r"order.*(status|details|info)"
],
"analytics_inquiry": [
r"analyz|trends?|insights?|reports?",
r"compare|comparison|vs|versus",
r"performance|metrics",
r"which products?|what products?|top products?|best products?",
r"most profit|highest profit|profitable|profitability",
r"generated|earned|made.*profit",
r"this week|last week|this month|last month"
]
}
self.entity_patterns = {
"time_periods": {
r"last week": ("last_week", 7),
r"last month": ("last_month", 30),
r"this week": ("this_week", 7),
r"this month": ("this_month", 30),
r"last (\d+) days?": ("days", None),
r"past (\d+) (weeks?|months?)": ("period", None)
},
# Default product patterns - these help identify common product mentions
# In a production system, these could be dynamically loaded from the database
"products": [
r"shirt|t-shirt|tee",
r"jeans|pants|trousers",
r"iphone|phone|smartphone",
r"laptop|computer|macbook",
r"shoes|sneakers|footwear"
],
# Default category patterns - these help classify product categories
# These patterns work as fallbacks for common category names
"categories": [
r"electronics?",
r"clothing|apparel|fashion",
r"books?",
r"home|garden",
r"sports?"
]
}
async def process_query(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Main query processing pipeline"""
start_time = datetime.utcnow()
try:
# Step 1: Intent classification
intent = self._classify_intent(query)
# Step 2: Entity extraction
entities = self._extract_entities(query)
# Step 3: Tool selection and parameter mapping
tool_calls = self._select_tools(intent, entities, query, context)
# Step 4: Execute tools
tool_results = []
for tool_call in tool_calls:
result = await mongodb_tool_registry.execute_tool(tool_call["tool"], tool_call["parameters"])
tool_results.append(result)
# Step 5: Generate response using model
response_text = self._generate_response(query, intent, entities, tool_results)
# Step 6: Structure the response
structured_data = self._structure_data(tool_results)
execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000
# Include token usage in metadata if available
metadata = {
"model_used": model_manager.active_model if model_manager.active_model else "template-based",
"execution_time_ms": int(execution_time),
"tools_called": [tc["tool"] for tc in tool_calls],
"confidence_score": self._calculate_confidence(intent, entities, tool_results),
"query_intent": intent,
"extracted_entities": list(entities.keys()) if entities else []
}
if self._token_usage:
metadata["token_usage"] = self._token_usage
# Calculate tokens per second if we have token usage data
total_tokens = self._token_usage.get("total_tokens", 0)
if total_tokens > 0 and execution_time > 0:
tokens_per_second = round((total_tokens * 1000) / execution_time, 2)
metadata["tokens_per_second"] = tokens_per_second
return {
"success": True,
"response": response_text,
"structured_data": structured_data,
"metadata": metadata,
"debug": {
"tool_calls": tool_calls,
"tool_results": tool_results
}
}
except Exception as e:
logger.error(f"Query processing error: {e}", exc_info=True)
execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000
return {
"success": False,
"response": "I encountered an error processing your request. Please try rephrasing your question.",
"error": str(e),
"metadata": {
"execution_time_ms": int(execution_time),
"confidence_score": 0.0
}
}
def _classify_intent(self, query: str) -> str:
"""Classify the intent of the query"""
# Try hybrid classification if available and enabled
if (self.hybrid_intent_service and
self.hybrid_intent_service.config.enabled):
try:
import asyncio
# Use hybrid classification
intent = asyncio.run(self.hybrid_intent_service.classify_intent(query))
logger.debug(f"Hybrid classification: '{query}' -> '{intent}'")
return intent
except Exception as e:
logger.warning(f"Hybrid classification failed, falling back to regex: {e}")
# Fallback to existing regex classification (current system unchanged)
return self._classify_intent_regex(query)
def _classify_intent_regex(self, query: str) -> str:
"""Original regex-based classification (preserved as fallback)"""
query_lower = query.lower()
for intent, patterns in self.intent_patterns.items():
for pattern in patterns:
if re.search(pattern, query_lower):
return intent
return "general_inquiry"
# Add methods to control hybrid system at runtime
async def enable_hybrid_intent_classification(self):
"""Enable hybrid intent classification (can be called at runtime)"""
if self.hybrid_intent_service:
return await self.hybrid_intent_service.enable_hybrid_classification()
return False
def disable_hybrid_intent_classification(self):
"""Disable hybrid intent classification (fallback to current system)"""
if self.hybrid_intent_service:
self.hybrid_intent_service.disable_hybrid_classification()
def get_intent_classification_metrics(self) -> Dict[str, Any]:
"""Get intent classification performance metrics"""
if self.hybrid_intent_service:
return self.hybrid_intent_service.hybrid_classifier.get_metrics()
return {"hybrid_available": False, "using": "regex_patterns"}
def _extract_entities(self, query: str) -> Dict[str, Any]:
"""Extract entities from the query"""
entities = {}
query_lower = query.lower()
# Extract time periods
for pattern, (entity_type, days) in self.entity_patterns["time_periods"].items():
match = re.search(pattern, query_lower)
if match:
entities["time_period"] = entity_type
if days is None and match.groups():
entities["time_value"] = match.group(1)
else:
entities["time_days"] = days
break
# Extract products
for pattern in self.entity_patterns["products"]:
if re.search(pattern, query_lower):
entities["product"] = re.search(pattern, query_lower).group(0)
break
# Extract categories
for pattern in self.entity_patterns["categories"]:
if re.search(pattern, query_lower):
entities["category"] = re.search(pattern, query_lower).group(0).title()
break
# Extract numbers
numbers = re.findall(r'\b\d+\b', query)
if numbers:
entities["numbers"] = [int(n) for n in numbers]
return entities
def _select_tools(self, intent: str, entities: Dict[str, Any], query: str, context: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""Select appropriate tools and map parameters based on intent and entities"""
tool_calls = []
# Extract shop_id from context for all tools
base_params = {}
if context and context.get('shop_id'):
base_params['shop_id'] = context['shop_id']
if intent == "sales_inquiry":
params = base_params.copy()
if "product" in entities:
params["product"] = entities["product"]
if "category" in entities:
params["category"] = entities["category"]
if "time_period" in entities:
params.update(self._map_time_period(entities))
tool_calls.append({"tool": "get_sales_data", "parameters": params})
elif intent == "product_inquiry":
params = base_params.copy()
if "product" in entities:
params["product"] = entities["product"]
if "category" in entities:
params["category"] = entities["category"]
tool_calls.append({"tool": "get_product_data", "parameters": params})
elif intent == "inventory_inquiry":
params = base_params.copy()
if "product" in entities:
params["product"] = entities["product"]
if "category" in entities:
params["category"] = entities["category"]
if "numbers" in entities:
params["low_stock_threshold"] = entities["numbers"][0]
tool_calls.append({"tool": "get_inventory_status", "parameters": params})
elif intent == "customer_inquiry":
params = base_params.copy()
params["include_orders"] = True
# Check if asking for specific customer info
if re.search(r"customer.*(id|email)", query.lower()):
# Would need more sophisticated extraction for actual customer IDs/emails
pass
tool_calls.append({"tool": "get_customer_info", "parameters": params})
elif intent == "order_inquiry":
params = base_params.copy()
# Extract order status if mentioned
statuses = ["pending", "processing", "shipped", "fulfilled", "cancelled"]
for status in statuses:
if status in query.lower():
params["status"] = status
break
if "time_period" in entities:
params.update(self._map_time_period(entities))
tool_calls.append({"tool": "get_order_details", "parameters": params})
elif intent == "analytics_inquiry":
params = base_params.copy()
if "product" in entities:
params["product"] = entities["product"]
if "category" in entities:
params["category"] = entities["category"]
# Add time period if available
if "time_period" in entities:
params.update(self._map_time_period(entities))
# Determine if it's product analytics or revenue report
if re.search(r"product|performance|best|top|profit|which.*products?|what.*products?", query.lower()):
tool_calls.append({"tool": "get_product_analytics", "parameters": params})
else:
tool_calls.append({"tool": "get_revenue_report", "parameters": params})
# Handle greetings and general conversation - no tool calls needed
if intent in ["greeting", "general_conversation"]:
return []
# Default fallback - only for business queries
if not tool_calls and intent not in ["greeting", "general_conversation", "general_inquiry"]:
tool_calls.append({"tool": "get_sales_data", "parameters": base_params})
return tool_calls
def _map_time_period(self, entities: Dict[str, Any]) -> Dict[str, str]:
"""Map time period entities to date parameters"""
params = {}
if entities.get("time_period") == "last_week":
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
params["start_date"] = start_date.date().isoformat()
params["end_date"] = end_date.date().isoformat()
elif entities.get("time_period") == "last_month":
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
params["start_date"] = start_date.date().isoformat()
params["end_date"] = end_date.date().isoformat()
elif entities.get("time_period") == "this_month":
end_date = datetime.now()
start_date = end_date.replace(day=1)
params["start_date"] = start_date.date().isoformat()
params["end_date"] = end_date.date().isoformat()
return params
def _generate_response(
self,
query: str,
intent: str,
entities: Dict[str, Any],
tool_results: List[Dict[str, Any]]
) -> str:
"""Generate natural language response using the model"""
# Handle help responses without model for speed
if intent == "general_inquiry" and not tool_results:
return self._generate_help_response()
# For business queries, use model if available
try:
if not model_manager.auto_load_best_model(query):
logger.warning("No suitable model available, using template response")
return self._generate_template_response(intent, tool_results)
except Exception as e:
logger.warning(f"Model auto-loading failed: {e}, using template response")
return self._generate_template_response(intent, tool_results)
# Create optimized prompt for the model
prompt = self._create_model_prompt(query, intent, entities, tool_results)
try:
logger.info(f"Starting model inference with active model: {model_manager.active_model}")
# Optimize token count based on intent for faster responses
if intent in ["greeting", "general_conversation"]:
max_tokens = 30
temperature = 0.5
elif intent in ["product_inquiry", "inventory_inquiry"]:
max_tokens = 50 # Short factual responses
temperature = 0.2
else:
max_tokens = 80 # Moderate length for other queries
temperature = 0.3
result = model_manager.inference(prompt, max_tokens=max_tokens, temperature=temperature)
logger.info("Model inference completed successfully")
# Clean up the response text
response_text = self._clean_model_response(result["text"])
# Store token usage for metadata
self._token_usage = result["token_usage"]
return response_text
except Exception as e:
logger.error(f"Model generation error: {e}")
# Reset token usage on error
self._token_usage = None
return self._generate_template_response(intent, tool_results)
def _generate_greeting_response(self, query: str) -> str:
"""Generate natural greeting response"""
query_lower = query.lower().strip()
if "how are you" in query_lower or "how's it going" in query_lower:
return "Hello! I'm doing well, thank you for asking. I'm here to help you analyze your e-commerce data. How can I assist you today?"
elif any(greeting in query_lower for greeting in ["good morning", "good afternoon", "good evening"]):
return "Good day! I'm ready to help you with your business analytics and data insights. What would you like to know?"
elif "what's up" in query_lower:
return "Hello! Not much, just ready to help you dive into your business data. What would you like to explore today?"
else:
return "Hello! I'm your e-commerce data assistant. I can help you analyze sales, inventory, customers, orders, and business performance. What would you like to know?"
def _generate_conversational_response(self, query: str, entities: Dict[str, Any]) -> str:
"""Generate natural conversational response"""
query_lower = query.lower().strip()
if query_lower.startswith(("thank", "thanks", "appreciate")):
return "You're very welcome! I'm here whenever you need insights about your business data. Feel free to ask me anything about your sales, inventory, customers, or orders."
elif query_lower in ["yes", "no", "okay", "ok", "sure", "alright"]:
return "Great! What would you like to explore about your business today? I can analyze sales trends, inventory levels, customer behavior, or order patterns."
elif "can you help" in query_lower or "need help" in query_lower:
return "Absolutely! I can help you analyze your e-commerce data. Try asking me about sales performance, inventory status, top customers, recent orders, or business trends."
elif "what can you do" in query_lower or "what are you" in query_lower:
return "I'm your e-commerce data analyst! I can help you understand your business performance by analyzing sales data, tracking inventory levels, identifying top customers, monitoring orders, and generating insights to help you make better business decisions."
else:
return "I'm here to help with your business analytics. You can ask me about sales, inventory, customers, orders, or any specific business metrics you'd like to explore."
def _generate_help_response(self) -> str:
"""Generate helpful response for general inquiries"""
return "I'm your e-commerce business analyst. I can help you with:\n⢠Sales analysis and revenue reports\n⢠Inventory monitoring and stock alerts\n⢠Customer insights and top buyer identification\n⢠Order tracking and fulfillment status\n⢠Business performance analytics\n\nJust ask me about any aspect of your business data!"
def _create_model_prompt(
self,
query: str,
intent: str,
entities: Dict[str, Any],
tool_results: List[Dict[str, Any]]
) -> str:
"""Create an optimized prompt for the AI model"""
# Extract successful tool results
successful_results = [r.get('result', {}) for r in tool_results if r.get('success')]
# Build context based on intent
context_parts = []
if intent == "greeting":
context_parts.append("You are a friendly e-commerce business assistant. Respond naturally and warmly to the user's greeting.")
elif intent == "general_conversation":
context_parts.append("You are a helpful e-commerce business assistant. Respond conversationally and offer to help with business analytics.")
elif intent == "sales_inquiry":
context_parts.append("You are analyzing sales data for an e-commerce business.")
elif intent == "inventory_inquiry":
context_parts.append("You are analyzing inventory levels for an e-commerce business.")
elif intent == "customer_inquiry":
context_parts.append("You are analyzing customer data for an e-commerce business.")
elif intent == "order_inquiry":
context_parts.append("You are analyzing order information for an e-commerce business.")
else:
context_parts.append("You are analyzing e-commerce business data.")
# Add data summary
if successful_results:
context_parts.append(f"Data available: {json.dumps(successful_results, indent=2)}")
else:
# If no successful results, check for errors
failed_tools = [r for r in tool_results if not r.get('success')]
if failed_tools:
context_parts.append("IMPORTANT: Data retrieval failed. No actual data is available.")
context_parts.append("You MUST inform the user that the data could not be retrieved.")
context = "\n".join(context_parts)
# Create the prompt
if intent in ["greeting", "general_conversation"]:
# Shorter prompt for greetings/conversations for faster response
prompt = f"""{context}
User: {query}
Instructions: Reply naturally and briefly (under 30 words). Offer to help with their e-commerce business.
Response:"""
else:
# Full prompt for business queries
prompt = f"""{context}
User Question: {query}
Instructions:
- Give a SHORT, DIRECT answer in 1-2 sentences maximum
- State the main fact ONCE - do NOT repeat the same information
- Use ONLY the specific numbers from the data
- STOP after answering - do NOT continue or elaborate
- Example good answer: "You have 107 products with prices ranging from $3 to $100,000."
- Example bad answer: "You have 107 products. Based on the data, there are 107 products. The total is 107 products."
Answer:"""
return prompt
def _clean_model_response(self, response: str) -> str:
"""Clean up model response - remove repetition and artifacts"""
# Remove common model artifacts
response = response.strip()
# Split into sentences
import re
sentences = re.split(r'(?<=[.!?])\s+', response)
# Remove duplicate/similar sentences
seen_sentences = []
unique_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# Skip prompt echoes
if sentence.startswith(('User Question:', 'Instructions:', 'Query:', 'Context:')):
continue
# Check if this sentence is too similar to ones we've seen
is_duplicate = False
sentence_lower = sentence.lower()
for seen in seen_sentences:
# Check for exact duplicates or very similar sentences
if (seen == sentence_lower or
sentence_lower.startswith(seen[:20]) or # Same beginning
seen.startswith(sentence_lower[:20])): # Same beginning
is_duplicate = True
break
if not is_duplicate and sentence:
unique_sentences.append(sentence)
seen_sentences.append(sentence_lower)
# Stop if we have a complete answer (avoid repetition)
if len(unique_sentences) >= 2 and sentence.endswith('.'):
# Check if we have a complete thought
combined = ' '.join(unique_sentences)
if ('have' in combined or 'are' in combined or 'total' in combined) and \
any(char.isdigit() for char in combined):
break
cleaned_response = ' '.join(unique_sentences)
# Remove any remaining artifacts
artifacts = [
"Answer:", "Response:", "Based on the data:",
"According to the information provided:",
"Here is the answer:"
]
for artifact in artifacts:
if cleaned_response.startswith(artifact):
cleaned_response = cleaned_response[len(artifact):].strip()
return cleaned_response
def _generate_template_response(self, intent: str, tool_results: List[Dict[str, Any]]) -> str:
"""Generate template-based response as fallback"""
if not tool_results or not any(r.get('success') for r in tool_results):
return "I wasn't able to retrieve the requested data. Please try rephrasing your question or check if the data exists."
successful_result = next(r for r in tool_results if r.get('success'))
result_data = successful_result.get('result', {})
if intent == "sales_inquiry":
if 'total_revenue' in result_data:
return f"Based on your sales data, you generated ${result_data['total_revenue']} in revenue with {result_data.get('total_quantity', 0)} units sold."
elif intent == "inventory_inquiry":
if 'low_stock_items' in result_data:
low_stock_count = len(result_data['low_stock_items'])
if low_stock_count > 0:
return f"You have {low_stock_count} products with low stock levels. The most critical items need immediate attention."
else:
return "All your products are currently well-stocked above the minimum threshold."
elif intent == "customer_inquiry":
if 'customers' in result_data:
customers = result_data['customers']
if customers:
top_customer = customers[0]
return f"Your top customer is {top_customer['name']} with ${top_customer['total_spent']} in total purchases across {top_customer['total_orders']} orders."
elif intent == "order_inquiry":
if 'summary' in result_data:
summary = result_data['summary']
return f"You have {summary['total_orders']} orders with a total value of ${summary['total_value']}. The average order value is ${summary['average_order_value']}."
return "I've retrieved your data successfully. Please review the detailed information provided."
def _prepare_model_context(
self,
query: str,
intent: str,
entities: Dict[str, Any],
tool_results: List[Dict[str, Any]]
) -> str:
"""Prepare context for model generation"""
context_parts = []
context_parts.append(f"User asked: {query}")
context_parts.append(f"Query intent: {intent}")
if entities:
context_parts.append(f"Extracted entities: {', '.join(f'{k}: {v}' for k, v in entities.items())}")
for result in tool_results:
if result.get('success'):
tool_name = result.get('tool', 'unknown')
context_parts.append(f"Data from {tool_name}: {json.dumps(result.get('result', {}), indent=2)}")
return "\n".join(context_parts)
def _structure_data(self, tool_results: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Structure the tool results into a consistent format"""
if not tool_results:
return {
"product": None,
"category": None,
"period": None,
"metrics": None,
"filters": None,
"results": None
}
structured = {
"product": None,
"category": None,
"period": None,
"metrics": None,
"filters": None,
"results": None
}
for result in tool_results:
if result.get('success'):
tool_name = result.get('tool')
result_data = result.get('result', {})
if tool_name == "get_sales_data":
structured["metrics"] = {
"quantity": result_data.get('total_quantity'),
"revenue": result_data.get('total_revenue'),
"average_price": result_data.get('average_order_value')
}
structured["results"] = result_data.get('breakdown', [])
elif tool_name == "get_inventory_status":
structured["results"] = [{
"inventory_summary": {
"total_products": result_data.get('total_products'),
"low_stock_count": result_data.get('low_stock_count'),
"out_of_stock_count": result_data.get('out_of_stock_count')
},
"critical_items": result_data.get('low_stock_items', [])[:5]
}]
elif tool_name in ["get_customer_info", "get_order_details", "get_product_analytics", "get_revenue_report"]:
# Ensure results is always a list for API validation
if isinstance(result_data, dict):
structured["results"] = [result_data]
elif isinstance(result_data, list):
structured["results"] = result_data
else:
structured["results"] = [{"data": result_data}]
return structured
def _calculate_confidence(
self,
intent: str,
entities: Dict[str, Any],
tool_results: List[Dict[str, Any]]
) -> float:
"""Calculate confidence score for the response"""
# High confidence for greetings and conversational responses
if intent in ["greeting", "general_conversation"]:
return 0.95
# High confidence for help responses
if intent == "general_inquiry" and not tool_results:
return 0.9
# For business queries with data
confidence = 0.5 # Base confidence
# Boost confidence for successful intent classification
if intent != "general_inquiry":
confidence += 0.2
# Boost confidence for entity extraction
if entities:
confidence += 0.1 * min(len(entities), 3)
# Boost confidence for successful tool execution
successful_tools = sum(1 for r in tool_results if r.get('success'))
if successful_tools > 0:
confidence += 0.2 * min(successful_tools, 2)
# Penalize for tool failures
failed_tools = sum(1 for r in tool_results if not r.get('success'))
confidence -= 0.1 * failed_tools
return min(max(confidence, 0.0), 1.0)
# Global query processor instance
query_processor = QueryProcessor()