Skip to main content
Glama
kebabmane

Amazon Security Lake MCP Server

by kebabmane
adaptive_search.py10.7 kB
import structlog from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple, Callable logger = structlog.get_logger(__name__) class AdaptiveSearchEngine: """ Adaptive search engine that tries multiple strategies when initial queries fail. Critical fix for empty results despite data being present. """ def __init__(self, athena_client, query_builder): self.athena_client = athena_client self.query_builder = query_builder async def adaptive_search( self, search_function: Callable, search_params: Dict[str, Any], table_name: str ) -> Dict[str, Any]: """ Try multiple search strategies to find data. Implementation of recommended adaptive search pattern. """ strategies = [ ("full_filters", lambda: search_function(**search_params)), ("relaxed_time", lambda: self._search_with_relaxed_time(search_function, search_params)), ("no_time_filter", lambda: self._search_without_time_filter(search_function, search_params)), ("recent_data_only", lambda: self._search_recent_data_only(search_function, search_params)), ("simple_count", lambda: self._get_table_count(table_name)) ] results = [] strategy_results = {} for strategy_name, strategy_func in strategies: try: logger.info(f"Trying search strategy: {strategy_name}") strategy_result = await strategy_func() strategy_results[strategy_name] = { "success": True, "result_count": len(strategy_result) if isinstance(strategy_result, list) else strategy_result.get("count", 0), "strategy": strategy_name } if strategy_result and self._has_meaningful_results(strategy_result): results = strategy_result logger.info(f"Strategy '{strategy_name}' succeeded with {len(results) if isinstance(results, list) else 'N/A'} results") break except Exception as e: logger.warning(f"Strategy '{strategy_name}' failed", error=str(e)) strategy_results[strategy_name] = { "success": False, "error": str(e), "strategy": strategy_name } continue # If still no results, provide diagnostic information if not results or len(results) == 0: diagnostic_info = await self._get_diagnostic_info(table_name, search_params) return { "success": False, "results": [], "error": "No data found with any search strategy", "diagnostics": diagnostic_info, "strategies_attempted": strategy_results, "suggestions": self._generate_search_suggestions(diagnostic_info) } return { "success": True, "results": results, "strategy_used": strategy_name, "strategies_attempted": strategy_results } def _has_meaningful_results(self, result) -> bool: """Check if result contains meaningful data""" if isinstance(result, list): return len(result) > 0 elif isinstance(result, dict): return result.get("success", False) and result.get("count", 0) > 0 elif isinstance(result, int): return result > 0 return False async def _search_with_relaxed_time( self, search_function: Callable, params: Dict[str, Any] ) -> List[Dict[str, Any]]: """Try search with expanded time range""" relaxed_params = params.copy() # Expand time range by 30 days if "start_time" in relaxed_params: try: from dateutil import parser as date_parser start_dt = date_parser.parse(relaxed_params["start_time"]) relaxed_start = start_dt - timedelta(days=30) relaxed_params["start_time"] = relaxed_start.isoformat() except: # If parsing fails, use a very broad range relaxed_params["start_time"] = (datetime.utcnow() - timedelta(days=90)).isoformat() if "end_time" in relaxed_params: try: from dateutil import parser as date_parser end_dt = date_parser.parse(relaxed_params["end_time"]) relaxed_end = end_dt + timedelta(days=7) relaxed_params["end_time"] = relaxed_end.isoformat() except: relaxed_params["end_time"] = datetime.utcnow().isoformat() return await search_function(**relaxed_params) async def _search_without_time_filter( self, search_function: Callable, params: Dict[str, Any] ) -> List[Dict[str, Any]]: """Try search without time filters""" no_time_params = params.copy() no_time_params.pop("start_time", None) no_time_params.pop("end_time", None) # Limit results when removing time filters no_time_params["limit"] = min(no_time_params.get("limit", 100), 50) return await search_function(**no_time_params) async def _search_recent_data_only( self, search_function: Callable, params: Dict[str, Any] ) -> List[Dict[str, Any]]: """Try search with only recent data (last 24 hours)""" recent_params = params.copy() recent_params["start_time"] = (datetime.utcnow() - timedelta(hours=24)).isoformat() recent_params["end_time"] = datetime.utcnow().isoformat() return await search_function(**recent_params) async def _get_table_count(self, table_name: str) -> Dict[str, Any]: """Get simple record count to verify data exists""" try: count_query = f""" SELECT COUNT(*) as record_count FROM "{self.query_builder.database}"."{table_name}" LIMIT 1 """ result = await self.athena_client.execute_query(count_query, []) if result and len(result) > 0: count = int(result[0].get("record_count", 0)) return {"count": count, "table_has_data": count > 0} else: return {"count": 0, "table_has_data": False} except Exception as e: logger.error("Failed to get table count", table=table_name, error=str(e)) return {"count": 0, "table_has_data": False, "error": str(e)} async def _get_diagnostic_info( self, table_name: str, search_params: Dict[str, Any] ) -> Dict[str, Any]: """Get diagnostic information about the table and search parameters""" diagnostics = { "table_name": table_name, "search_parameters": search_params } try: # Get table record count count_result = await self._get_table_count(table_name) diagnostics["table_record_count"] = count_result.get("count", 0) diagnostics["table_has_data"] = count_result.get("table_has_data", False) # Get data freshness info freshness_query = f""" SELECT MAX(time) as latest_record, MIN(time) as earliest_record, COUNT(DISTINCT DATE(time)) as days_with_data FROM "{self.query_builder.database}"."{table_name}" WHERE time IS NOT NULL LIMIT 1 """ freshness_result = await self.athena_client.execute_query(freshness_query, []) if freshness_result and len(freshness_result) > 0: diagnostics["data_freshness"] = { "latest_record": freshness_result[0].get("latest_record"), "earliest_record": freshness_result[0].get("earliest_record"), "days_with_data": freshness_result[0].get("days_with_data") } # Check if specific filters might be too restrictive if search_params.get("severity"): severity_query = f""" SELECT severity, COUNT(*) as count FROM "{self.query_builder.database}"."{table_name}" WHERE severity IS NOT NULL GROUP BY severity ORDER BY count DESC LIMIT 10 """ severity_result = await self.athena_client.execute_query(severity_query, []) diagnostics["available_severities"] = severity_result except Exception as e: diagnostics["diagnostic_error"] = str(e) logger.error("Failed to get diagnostic info", error=str(e)) return diagnostics def _generate_search_suggestions(self, diagnostics: Dict[str, Any]) -> List[str]: """Generate actionable suggestions based on diagnostic information""" suggestions = [] table_count = diagnostics.get("table_record_count", 0) if table_count == 0: suggestions.append("Table appears to be empty - verify data ingestion is working") suggestions.append("Check if Security Lake is properly configured and receiving data") elif table_count > 0: suggestions.append(f"Table contains {table_count} records but search filters may be too restrictive") # Time range suggestions freshness = diagnostics.get("data_freshness", {}) latest = freshness.get("latest_record") earliest = freshness.get("earliest_record") if latest and earliest: suggestions.append(f"Data available from {earliest} to {latest}") suggestions.append("Try expanding your time range or removing time filters") # Severity suggestions available_severities = diagnostics.get("available_severities", []) if available_severities: severity_list = [s.get("severity") for s in available_severities if s.get("severity")] suggestions.append(f"Available severity levels: {', '.join(severity_list)}") suggestions.append("Try removing specific filters and gradually add them back") suggestions.append("Use the 'list_data_sources' tool to verify table schema") return suggestions

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kebabmane/asl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server