import structlog
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple, Callable
logger = structlog.get_logger(__name__)
class AdaptiveSearchEngine:
"""
Adaptive search engine that tries multiple strategies when initial queries fail.
Critical fix for empty results despite data being present.
"""
def __init__(self, athena_client, query_builder):
self.athena_client = athena_client
self.query_builder = query_builder
async def adaptive_search(
self,
search_function: Callable,
search_params: Dict[str, Any],
table_name: str
) -> Dict[str, Any]:
"""
Try multiple search strategies to find data.
Implementation of recommended adaptive search pattern.
"""
strategies = [
("full_filters", lambda: search_function(**search_params)),
("relaxed_time", lambda: self._search_with_relaxed_time(search_function, search_params)),
("no_time_filter", lambda: self._search_without_time_filter(search_function, search_params)),
("recent_data_only", lambda: self._search_recent_data_only(search_function, search_params)),
("simple_count", lambda: self._get_table_count(table_name))
]
results = []
strategy_results = {}
for strategy_name, strategy_func in strategies:
try:
logger.info(f"Trying search strategy: {strategy_name}")
strategy_result = await strategy_func()
strategy_results[strategy_name] = {
"success": True,
"result_count": len(strategy_result) if isinstance(strategy_result, list) else strategy_result.get("count", 0),
"strategy": strategy_name
}
if strategy_result and self._has_meaningful_results(strategy_result):
results = strategy_result
logger.info(f"Strategy '{strategy_name}' succeeded with {len(results) if isinstance(results, list) else 'N/A'} results")
break
except Exception as e:
logger.warning(f"Strategy '{strategy_name}' failed", error=str(e))
strategy_results[strategy_name] = {
"success": False,
"error": str(e),
"strategy": strategy_name
}
continue
# If still no results, provide diagnostic information
if not results or len(results) == 0:
diagnostic_info = await self._get_diagnostic_info(table_name, search_params)
return {
"success": False,
"results": [],
"error": "No data found with any search strategy",
"diagnostics": diagnostic_info,
"strategies_attempted": strategy_results,
"suggestions": self._generate_search_suggestions(diagnostic_info)
}
return {
"success": True,
"results": results,
"strategy_used": strategy_name,
"strategies_attempted": strategy_results
}
def _has_meaningful_results(self, result) -> bool:
"""Check if result contains meaningful data"""
if isinstance(result, list):
return len(result) > 0
elif isinstance(result, dict):
return result.get("success", False) and result.get("count", 0) > 0
elif isinstance(result, int):
return result > 0
return False
async def _search_with_relaxed_time(
self,
search_function: Callable,
params: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Try search with expanded time range"""
relaxed_params = params.copy()
# Expand time range by 30 days
if "start_time" in relaxed_params:
try:
from dateutil import parser as date_parser
start_dt = date_parser.parse(relaxed_params["start_time"])
relaxed_start = start_dt - timedelta(days=30)
relaxed_params["start_time"] = relaxed_start.isoformat()
except:
# If parsing fails, use a very broad range
relaxed_params["start_time"] = (datetime.utcnow() - timedelta(days=90)).isoformat()
if "end_time" in relaxed_params:
try:
from dateutil import parser as date_parser
end_dt = date_parser.parse(relaxed_params["end_time"])
relaxed_end = end_dt + timedelta(days=7)
relaxed_params["end_time"] = relaxed_end.isoformat()
except:
relaxed_params["end_time"] = datetime.utcnow().isoformat()
return await search_function(**relaxed_params)
async def _search_without_time_filter(
self,
search_function: Callable,
params: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Try search without time filters"""
no_time_params = params.copy()
no_time_params.pop("start_time", None)
no_time_params.pop("end_time", None)
# Limit results when removing time filters
no_time_params["limit"] = min(no_time_params.get("limit", 100), 50)
return await search_function(**no_time_params)
async def _search_recent_data_only(
self,
search_function: Callable,
params: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Try search with only recent data (last 24 hours)"""
recent_params = params.copy()
recent_params["start_time"] = (datetime.utcnow() - timedelta(hours=24)).isoformat()
recent_params["end_time"] = datetime.utcnow().isoformat()
return await search_function(**recent_params)
async def _get_table_count(self, table_name: str) -> Dict[str, Any]:
"""Get simple record count to verify data exists"""
try:
count_query = f"""
SELECT COUNT(*) as record_count
FROM "{self.query_builder.database}"."{table_name}"
LIMIT 1
"""
result = await self.athena_client.execute_query(count_query, [])
if result and len(result) > 0:
count = int(result[0].get("record_count", 0))
return {"count": count, "table_has_data": count > 0}
else:
return {"count": 0, "table_has_data": False}
except Exception as e:
logger.error("Failed to get table count", table=table_name, error=str(e))
return {"count": 0, "table_has_data": False, "error": str(e)}
async def _get_diagnostic_info(
self,
table_name: str,
search_params: Dict[str, Any]
) -> Dict[str, Any]:
"""Get diagnostic information about the table and search parameters"""
diagnostics = {
"table_name": table_name,
"search_parameters": search_params
}
try:
# Get table record count
count_result = await self._get_table_count(table_name)
diagnostics["table_record_count"] = count_result.get("count", 0)
diagnostics["table_has_data"] = count_result.get("table_has_data", False)
# Get data freshness info
freshness_query = f"""
SELECT
MAX(time) as latest_record,
MIN(time) as earliest_record,
COUNT(DISTINCT DATE(time)) as days_with_data
FROM "{self.query_builder.database}"."{table_name}"
WHERE time IS NOT NULL
LIMIT 1
"""
freshness_result = await self.athena_client.execute_query(freshness_query, [])
if freshness_result and len(freshness_result) > 0:
diagnostics["data_freshness"] = {
"latest_record": freshness_result[0].get("latest_record"),
"earliest_record": freshness_result[0].get("earliest_record"),
"days_with_data": freshness_result[0].get("days_with_data")
}
# Check if specific filters might be too restrictive
if search_params.get("severity"):
severity_query = f"""
SELECT severity, COUNT(*) as count
FROM "{self.query_builder.database}"."{table_name}"
WHERE severity IS NOT NULL
GROUP BY severity
ORDER BY count DESC
LIMIT 10
"""
severity_result = await self.athena_client.execute_query(severity_query, [])
diagnostics["available_severities"] = severity_result
except Exception as e:
diagnostics["diagnostic_error"] = str(e)
logger.error("Failed to get diagnostic info", error=str(e))
return diagnostics
def _generate_search_suggestions(self, diagnostics: Dict[str, Any]) -> List[str]:
"""Generate actionable suggestions based on diagnostic information"""
suggestions = []
table_count = diagnostics.get("table_record_count", 0)
if table_count == 0:
suggestions.append("Table appears to be empty - verify data ingestion is working")
suggestions.append("Check if Security Lake is properly configured and receiving data")
elif table_count > 0:
suggestions.append(f"Table contains {table_count} records but search filters may be too restrictive")
# Time range suggestions
freshness = diagnostics.get("data_freshness", {})
latest = freshness.get("latest_record")
earliest = freshness.get("earliest_record")
if latest and earliest:
suggestions.append(f"Data available from {earliest} to {latest}")
suggestions.append("Try expanding your time range or removing time filters")
# Severity suggestions
available_severities = diagnostics.get("available_severities", [])
if available_severities:
severity_list = [s.get("severity") for s in available_severities if s.get("severity")]
suggestions.append(f"Available severity levels: {', '.join(severity_list)}")
suggestions.append("Try removing specific filters and gradually add them back")
suggestions.append("Use the 'list_data_sources' tool to verify table schema")
return suggestions