search
Query Elasticsearch indices with advanced filters, pagination, and time-based sorting to locate specific documents efficiently. Ideal for targeted data retrieval and analysis.
Instructions
Search documents in Elasticsearch index with advanced filtering, pagination, and time-based sorting capabilities
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| date_from | No | Start date filter in ISO format (YYYY-MM-DD) | |
| date_to | No | End date filter in ISO format (YYYY-MM-DD) | |
| fields | No | Specific fields to include in search results | |
| index | Yes | Name of the Elasticsearch index to search | |
| query | Yes | Search query text to find matching documents | |
| size | No | Maximum number of results to return | |
| sort_by_time | No | Sort order by timestamp | desc |
| time_period | No | Predefined time period filter (e.g., '7d', '1m', '1y') |
Implementation Reference
- Core handler function implementing the 'search' tool. Performs Elasticsearch search with multi-match queries, time filtering, sorting, result formatting, reorganization analysis, and agent-friendly guidance messages.@app.tool( description="Search documents in Elasticsearch index with advanced filtering, pagination, and time-based sorting capabilities", tags={"elasticsearch", "search", "query"} ) async def search( index: Annotated[str, Field(description="Name of the Elasticsearch index to search")], query: Annotated[str, Field(description="Search query text to find matching documents")], size: Annotated[int, Field(description="Maximum number of results to return", ge=1, le=1000)] = 10, fields: Annotated[Optional[List[str]], Field(description="Specific fields to include in search results")] = None, date_from: Annotated[Optional[str], Field(description="Start date filter in ISO format (YYYY-MM-DD)")] = None, date_to: Annotated[Optional[str], Field(description="End date filter in ISO format (YYYY-MM-DD)")] = None, time_period: Annotated[Optional[str], Field(description="Predefined time period filter (e.g., '7d', '1m', '1y')")] = None, sort_by_time: Annotated[str, Field(description="Sort order by timestamp", pattern="^(asc|desc)$")] = "desc" ) -> str: """Search documents in Elasticsearch index with optional time-based filtering.""" try: es = get_es_client() # Parse time filters time_filter = parse_time_parameters(date_from, date_to, time_period) # Build search query with optional time filtering if time_filter: # Combine text search with time filtering search_body = { "query": { "bool": { "must": [ { "multi_match": { "query": query, "fields": ["title^3", "summary^2", "content", "tags^2", "features^2", "tech_stack^2"] } } ], "filter": [time_filter] } } } else: # Standard text search without time filtering search_body = { "query": { "multi_match": { "query": query, "fields": ["title^3", "summary^2", "content", "tags^2", "features^2", "tech_stack^2"] } } } # Add sorting - prioritize time if time filtering is used if time_filter: if sort_by_time == "desc": search_body["sort"] = [ {"last_modified": {"order": "desc"}}, # Primary: newest first "_score" # Secondary: relevance ] else: search_body["sort"] = [ {"last_modified": {"order": "asc"}}, # Primary: oldest first "_score" # Secondary: relevance ] else: # Default sorting: relevance first, then recency search_body["sort"] = [ "_score", # Primary sort by relevance {"last_modified": {"order": "desc"}} # Secondary sort by recency ] search_body["size"] = size if fields: search_body["_source"] = fields result = es.search(index=index, body=search_body) # Build time filter description early for use in all branches time_filter_desc = "" if time_filter: if time_period: time_filter_desc = f" (filtered by: {time_period})" elif date_from or date_to: filter_parts = [] if date_from: filter_parts.append(f"from {date_from}") if date_to: filter_parts.append(f"to {date_to}") time_filter_desc = f" (filtered by: {' '.join(filter_parts)})" # Format results formatted_results = [] for hit in result['hits']['hits']: source = hit['_source'] score = hit['_score'] formatted_results.append({ "id": hit['_id'], "score": score, "source": source }) total_results = result['hits']['total']['value'] # Check if no results found and provide helpful suggestions if total_results == 0: time_suggestions = "" if time_filter: time_suggestions = ( f"\n\n⏰ **Time Filter Suggestions**:\n" + f" • Try broader time range (expand dates or use 'month'/'year')\n" + f" • Remove time filters to search all documents\n" + f" • Check if documents exist in the specified time period\n" + f" • Use relative dates like '30d' or '6m' for wider ranges\n" ) return (f"🔍 No results found for '{query}' in index '{index}'{time_filter_desc}\n\n" + f"💡 **Search Optimization Suggestions for Agents**:\n\n" + f"📂 **Try Other Indices**:\n" + f" • Use 'list_indices' tool to see all available indices\n" + f" • Search the same query in different indices\n" + f" • Content might be stored in a different index\n" + f" • Check indices with similar names or purposes\n\n" + f"🎯 **Try Different Keywords**:\n" + f" • Use synonyms and related terms\n" + f" • Try shorter, more general keywords\n" + f" • Break complex queries into simpler parts\n" + f" • Use different language variations if applicable\n\n" + f"📅 **Consider Recency**:\n" + f" • Recent documents may use different terminology\n" + f" • Try searching with current date/time related terms\n" + f" • Look for latest trends or recent updates\n" + f" • Use time_period='month' or 'year' for broader time searches\n\n" + f"🤝 **Ask User for Help**:\n" + f" • Request user to suggest related keywords\n" + f" • Ask about specific topics or domains they're interested in\n" + f" • Get context about what they're trying to find\n" + f" • Ask for alternative ways to describe their query\n\n" + f"🔧 **Technical Tips**:\n" + f" • Use broader search terms first, then narrow down\n" + f" • Check for typos in search terms\n" + f" • Consider partial word matches\n" + f" • Try fuzzy matching or wildcard searches" + time_suggestions) # Add detailed reorganization analysis for too many results reorganization_analysis = analyze_search_results_for_reorganization(formatted_results, query, total_results) # Build sorting description if time_filter: sort_desc = f"sorted by time ({sort_by_time}) then relevance" else: sort_desc = "sorted by relevance and recency" # Build guidance messages that will appear BEFORE results guidance_messages = "" # Limited results guidance (1-3 matches) if total_results > 0 and total_results <= 3: guidance_messages += (f"💡 **Limited Results Found** ({total_results} matches):\n" + f" 📂 **Check Other Indices**: Use 'list_indices' tool to see all available indices\n" + f" 🔍 **Search elsewhere**: Try the same query in different indices\n" + f" 🎯 **Expand keywords**: Try broader or alternative keywords for more results\n" + f" 🤝 **Ask user**: Request related terms or different perspectives\n" + f" 📊 **Results info**: Sorted by relevance first, then by recency" + (f"\n ⏰ **Time range**: Consider broader time range if using time filters" if time_filter else "") + f"\n\n") # Too many results guidance (15+ matches) if total_results > 15: guidance_messages += (f"🧹 **Too Many Results Found** ({total_results} matches):\n" + f" 📊 **Consider Knowledge Base Reorganization**:\n" + f" • Ask user: 'Would you like to organize the knowledge base better?'\n" + f" • List key topics found in search results\n" + f" • Ask user to confirm which topics to consolidate/update/delete\n" + f" • Suggest merging similar documents into comprehensive ones\n" + f" • Propose archiving outdated/redundant information\n" + f" 🎯 **User Collaboration Steps**:\n" + f" 1. 'I found {total_results} documents about this topic'\n" + f" 2. 'Would you like me to help organize them better?'\n" + f" 3. List main themes/topics from results\n" + f" 4. Get user confirmation for reorganization plan\n" + f" 5. Execute: consolidate, update, or delete as agreed\n" + f" 💡 **Quality Goals**: Fewer, better organized, comprehensive documents" + (f"\n • Consider narrower time range to reduce results" if time_filter else "") + f"\n\n") # Add reorganization analysis if present if reorganization_analysis: guidance_messages += reorganization_analysis + "\n\n" return (guidance_messages + f"Search results for '{query}' in index '{index}'{time_filter_desc} ({sort_desc}):\n\n" + json.dumps({ "total": total_results, "results": formatted_results }, indent=2, ensure_ascii=False)) except Exception as e: # Provide detailed error messages for different types of Elasticsearch errors error_message = "❌ Search failed:\n\n" error_str = str(e).lower() if "connection" in error_str or "refused" in error_str: error_message += "🔌 **Connection Error**: Cannot connect to Elasticsearch server\n" error_message += f"📍 Check if Elasticsearch is running at the configured address\n" error_message += f"💡 Try: Use 'setup_elasticsearch' tool to start Elasticsearch\n\n" elif ("index" in error_str and "not found" in error_str) or "index_not_found_exception" in error_str or "no such index" in error_str: error_message += f"📁 **Index Error**: Index '{index}' does not exist\n" error_message += f"📍 The search index has not been created yet\n" error_message += f"💡 **Suggestions for agents**:\n" error_message += f" 1. Use 'list_indices' tool to see all available indices\n" error_message += f" 2. Check which indices contain your target data\n" error_message += f" 3. Use the correct index name from the list\n" error_message += f" 4. If no suitable index exists, create one with 'create_index' tool\n\n" elif "timeout" in error_str: error_message += "⏱️ **Timeout Error**: Search query timed out\n" error_message += f"📍 Query may be too complex or index too large\n" error_message += f"💡 Try: Simplify query or reduce search size\n\n" elif "parse" in error_str or "query" in error_str: error_message += f"🔍 **Query Error**: Invalid search query format\n" error_message += f"📍 Search query syntax is not valid\n" error_message += f"💡 Try: Use simpler search terms\n\n" else: error_message += f"⚠️ **Unknown Error**: {str(e)}\n\n" error_message += f"🔍 **Technical Details**: {str(e)}" return error_message
- Input schema using Pydantic v2 Annotated types and Field validators/descriptions for the search tool parameters.async def search( index: Annotated[str, Field(description="Name of the Elasticsearch index to search")], query: Annotated[str, Field(description="Search query text to find matching documents")], size: Annotated[int, Field(description="Maximum number of results to return", ge=1, le=1000)] = 10, fields: Annotated[Optional[List[str]], Field(description="Specific fields to include in search results")] = None, date_from: Annotated[Optional[str], Field(description="Start date filter in ISO format (YYYY-MM-DD)")] = None, date_to: Annotated[Optional[str], Field(description="End date filter in ISO format (YYYY-MM-DD)")] = None, time_period: Annotated[Optional[str], Field(description="Predefined time period filter (e.g., '7d', '1m', '1y')")] = None, sort_by_time: Annotated[str, Field(description="Sort order by timestamp", pattern="^(asc|desc)$")] = "desc" ) -> str:
- src/elasticsearch/elasticsearch_server.py:29-50 (registration)Registers the search sub-server by importing its FastMCP app and mounting it into the unified Elasticsearch server app.from .sub_servers.elasticsearch_search import app as search_app from .sub_servers.elasticsearch_batch import app as batch_app # Create unified FastMCP application app = FastMCP( name="AgentKnowledgeMCP-Elasticsearch", version="2.0.0", instructions="Unified Elasticsearch tools for comprehensive knowledge management via modular server mounting" ) # ================================ # SERVER MOUNTING - MODULAR ARCHITECTURE # ================================ print("🏗️ Mounting Elasticsearch sub-servers...") # Mount all sub-servers into unified interface app.mount(snapshots_app) # 3 tools: snapshot management app.mount(index_metadata_app) # 3 tools: metadata governance app.mount(document_app) # 3 tools: document operations app.mount(index_app) # 3 tools: index management app.mount(search_app) # 2 tools: search & validation
- src/main_server.py:19-76 (registration)Mounts the full elasticsearch_server_app (including search tool) into the main AgentKnowledgeMCP FastMCP server, providing backward-compatible unprefixed access to 'search'.from src.elasticsearch.elasticsearch_server import app as elasticsearch_server_app from src.prompts.prompt_server import app as prompt_server_app # Import middleware from src.middleware.confirmation_middleware import ConfirmationMiddleware # Load configuration and initialize components CONFIG = load_config() init_security(CONFIG["security"]["allowed_base_directory"]) # Initialize confirmation manager confirmation_manager = initialize_confirmation_manager(CONFIG) print(f"✅ Confirmation system initialized (enabled: {CONFIG.get('confirmation', {}).get('enabled', True)})") # Auto-setup Elasticsearch if needed print("🔍 Checking Elasticsearch configuration...") config_path = Path(__file__).parent / "config.json" setup_result = auto_setup_elasticsearch(config_path, CONFIG) if setup_result["status"] == "setup_completed": # Reload config after setup CONFIG = load_config() print("✅ Elasticsearch auto-setup completed") elif setup_result["status"] == "already_configured": print("✅ Elasticsearch already configured") elif setup_result["status"] == "setup_failed": print(f"⚠️ Elasticsearch auto-setup failed: {setup_result.get('error', 'Unknown error')}") print("📝 You can manually setup using the 'setup_elasticsearch' tool") init_elasticsearch(CONFIG) # Create main FastMCP server app = FastMCP( name=CONFIG["server"]["name"], version=CONFIG["server"]["version"], instructions="🏗️ AgentKnowledgeMCP - Modern FastMCP server with modular composition architecture for knowledge management, Elasticsearch operations, file management, and system administration" ) # ================================ # MIDDLEWARE CONFIGURATION # ================================ print("🔒 Adding confirmation middleware...") # Add confirmation middleware to main server app.add_middleware(ConfirmationMiddleware()) print("✅ Confirmation middleware added successfully!") # ================================ # SERVER COMPOSITION - MOUNTING # ================================ print("🏗️ Mounting individual servers into main server...") # Mount Elasticsearch server with 'es' prefix # This provides: es_search, es_index_document, es_create_index, etc. app.mount(elasticsearch_server_app)
- Helper function called by search handler to parse and build Elasticsearch time range filters from input parameters.def parse_time_parameters(date_from: Optional[str] = None, date_to: Optional[str] = None, time_period: Optional[str] = None) -> Dict[str, Any]: """Parse time-based search parameters and return Elasticsearch date range filter.""" def parse_relative_date(date_str: str) -> datetime: """Parse relative date strings like '7d', '1w', '1m' to datetime.""" if not date_str: return None match = re.match(r'(\d+)([dwmy])', date_str.lower()) if match: amount, unit = match.groups() amount = int(amount) if unit == 'd': return datetime.now() - timedelta(days=amount) elif unit == 'w': return datetime.now() - timedelta(weeks=amount) elif unit == 'm': return datetime.now() - timedelta(days=amount * 30) elif unit == 'y': return datetime.now() - timedelta(days=amount * 365) return None def parse_date_string(date_str: str) -> str: """Parse various date formats to Elasticsearch compatible format.""" if not date_str: return None if date_str.lower() == 'now': return 'now' # Try relative dates first relative_date = parse_relative_date(date_str) if relative_date: return relative_date.isoformat() # Try parsing standard formats formats = [ '%Y-%m-%d', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%SZ' ] for fmt in formats: try: parsed_date = datetime.strptime(date_str, fmt) return parsed_date.isoformat() except ValueError: continue return None # Handle time_period shortcuts if time_period: now = datetime.now() if time_period == 'today': start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) return { "range": { "last_modified": { "gte": start_of_day.isoformat(), "lte": "now" } } } elif time_period == 'yesterday': yesterday = now - timedelta(days=1) start_of_yesterday = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) end_of_yesterday = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999) return { "range": { "last_modified": { "gte": start_of_yesterday.isoformat(), "lte": end_of_yesterday.isoformat() } } } elif time_period == 'week': week_ago = now - timedelta(weeks=1) return { "range": { "last_modified": { "gte": week_ago.isoformat(), "lte": "now" } } } elif time_period == 'month': month_ago = now - timedelta(days=30) return { "range": { "last_modified": { "gte": month_ago.isoformat(), "lte": "now" } } } elif time_period == 'year': year_ago = now - timedelta(days=365) return { "range": { "last_modified": { "gte": year_ago.isoformat(), "lte": "now" } } } # Handle explicit date range if date_from or date_to: range_filter = {"range": {"last_modified": {}}} if date_from: parsed_from = parse_date_string(date_from) if parsed_from: range_filter["range"]["last_modified"]["gte"] = parsed_from if date_to: parsed_to = parse_date_string(date_to) if parsed_to: range_filter["range"]["last_modified"]["lte"] = parsed_to if range_filter["range"]["last_modified"]: return range_filter return None