Skip to main content
Glama

batch_index_directory

Process documents from a directory into Elasticsearch with AI-enhanced metadata generation, supporting batch indexing and comprehensive file handling.

Instructions

Batch index all documents from a directory into Elasticsearch with AI-enhanced metadata generation and comprehensive file processing

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
indexYesName of the Elasticsearch index to store the documents
directory_pathYesPath to directory containing documents to index
file_patternNoFile pattern to match (e.g., '*.md', '*.txt', '*')*.md
validate_schemaNoWhether to validate document structure for knowledge base format
recursiveNoWhether to search subdirectories recursively
skip_existingNoSkip files that already exist in index (check by filename)
max_file_sizeNoMaximum file size in bytes to process
use_ai_enhancementNoUse AI to generate intelligent tags and key points for each document

Implementation Reference

  • Complete handler implementation for batch_index_directory tool: directory traversal, file filtering/reading, AI metadata generation, document validation, Elasticsearch indexing with comprehensive error handling and reporting.
    @app.tool( description="Batch index all documents from a directory into Elasticsearch with AI-enhanced metadata generation and comprehensive file processing", tags={"elasticsearch", "batch", "directory", "index", "bulk", "ai-enhanced"} ) async def batch_index_directory( index: Annotated[str, Field(description="Name of the Elasticsearch index to store the documents")], directory_path: Annotated[str, Field(description="Path to directory containing documents to index")], file_pattern: Annotated[str, Field(description="File pattern to match (e.g., '*.md', '*.txt', '*')")] = "*.md", validate_schema: Annotated[ bool, Field(description="Whether to validate document structure for knowledge base format")] = True, recursive: Annotated[bool, Field(description="Whether to search subdirectories recursively")] = True, skip_existing: Annotated[ bool, Field(description="Skip files that already exist in index (check by filename)")] = False, max_file_size: Annotated[ int, Field(description="Maximum file size in bytes to process", ge=1, le=10485760)] = 1048576, # 1MB default use_ai_enhancement: Annotated[ bool, Field(description="Use AI to generate intelligent tags and key points for each document")] = True, ctx: Context = None ) -> str: """Batch index all documents from a directory into Elasticsearch.""" try: from pathlib import Path import os # Check directory exists and is valid directory = Path(directory_path) if not directory.exists(): return f"❌ Directory not found: {directory_path}\nπŸ’‘ Check the directory path spelling and location" if not directory.is_dir(): return f"❌ Path is not a directory: {directory_path}\nπŸ’‘ Provide a directory path, not a file path" # Get Elasticsearch client es = get_es_client() # Find all matching files if recursive: files = list(directory.rglob(file_pattern)) else: files = list(directory.glob(file_pattern)) if not files: return f"❌ No files found matching pattern '{file_pattern}' in directory: {directory_path}\nπŸ’‘ Try a different file pattern like '*.txt', '*.json', or '*'" # Filter out files that are too large valid_files = [] skipped_size = [] for file_path in files: if file_path.is_file(): try: file_size = file_path.stat().st_size if file_size <= max_file_size: valid_files.append(file_path) else: skipped_size.append((file_path, file_size)) except Exception as e: # Skip files we can't stat continue if not valid_files: return f"❌ No valid files found (all files too large or inaccessible)\nπŸ’‘ Increase max_file_size or check file permissions" # Check for existing documents if skip_existing is True existing_docs = set() if skip_existing: try: # Search for existing documents by titles search_body = { "query": {"match_all": {}}, "size": 10000, # Get many docs to check "_source": ["title", "id"] } existing_result = es.search(index=index, body=search_body) for hit in existing_result['hits']['hits']: source = hit.get('_source', {}) if 'title' in source: existing_docs.add(source['title']) if 'id' in source: existing_docs.add(source['id']) except Exception: # If we can't check existing docs, proceed anyway pass # Process files successful = [] failed = [] skipped_existing = [] for file_path in valid_files: try: file_name = file_path.name # Handle files with multiple dots properly (e.g., .post.md, .get.md) clean_stem = file_path.name if file_path.suffix: clean_stem = file_path.name[:-len(file_path.suffix)] title = clean_stem.replace('_', ' ').replace('-', ' ').replace('.', ' ').title() # Skip if document with same title already exists in index if skip_existing and title in existing_docs: skipped_existing.append(file_name) continue # Read file content try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: # Try with different encodings try: with open(file_path, 'r', encoding='latin-1') as f: content = f.read() except Exception as e: failed.append((file_name, f"Encoding error: {str(e)}")) continue except Exception as e: failed.append((file_name, f"Read error: {str(e)}")) continue # Create document from file relative_path = file_path.relative_to(directory) # Handle files with multiple dots (e.g., .post.md, .get.md) by using the full name without final extension clean_stem = file_path.name if file_path.suffix: clean_stem = file_path.name[:-len(file_path.suffix)] doc_id = f"{clean_stem.replace('.', '_')}_{hash(str(relative_path)) % 100000}" # Create unique ID # Initialize basic tags and key points base_tags = [ "batch-indexed", file_path.suffix[1:] if file_path.suffix else "no-extension", directory.name ] base_key_points = [ f"Content length: {len(content)} characters", f"Source directory: {directory.name}" ] final_tags = base_tags.copy() final_key_points = base_key_points.copy() final_summary = f"Document from {file_name}" # Use AI enhancement if requested and context is available if use_ai_enhancement and ctx and content.strip(): try: await ctx.info(f"πŸ€– Generating AI metadata and smart content for: {file_name}") ai_metadata = await generate_smart_metadata(title, content, ctx) # Merge AI-generated tags with base tags ai_tags = ai_metadata.get("tags", []) for tag in ai_tags: if tag not in final_tags: final_tags.append(tag) # Merge AI-generated key points with base points ai_key_points = ai_metadata.get("key_points", []) for point in ai_key_points: if point not in final_key_points: final_key_points.append(point) # Use AI-generated smart summary and enhanced content ai_summary = ai_metadata.get("smart_summary", "") ai_enhanced_content = ai_metadata.get("enhanced_content", "") if ai_summary: final_summary = ai_summary elif len(content) > 100: # Fallback to content preview if no AI summary content_preview = content[:300].strip() if content_preview: final_summary = content_preview + ("..." if len(content) > 300 else "") # Use enhanced content if available and substantially different if ai_enhanced_content and len(ai_enhanced_content) > len(content) * 0.8: content = ai_enhanced_content except Exception as e: await ctx.warning(f"AI enhancement failed for {file_name}: {str(e)}") document = { "id": doc_id, "title": title, "summary": final_summary, "content": content, "last_modified": datetime.now().isoformat(), "priority": "medium", "tags": final_tags, "related": [], "source_type": "documentation", "key_points": final_key_points } # Validate document if requested if validate_schema: try: validated_doc = validate_document_structure(document) document = validated_doc except DocumentValidationError as e: failed.append((file_name, f"Validation error: {str(e)}")) continue except Exception as e: failed.append((file_name, f"Validation error: {str(e)}")) continue # Index the document try: result = es.index(index=index, id=doc_id, body=document) successful.append((file_name, doc_id, result.get('result', 'unknown'))) except Exception as e: failed.append((file_name, f"Indexing error: {str(e)}")) continue except Exception as e: failed.append((file_path.name, f"Processing error: {str(e)}")) continue # Build result summary total_processed = len(successful) + len(failed) + len(skipped_existing) result_summary = f"βœ… Batch indexing completed for directory: {directory_path}\n\n" # Summary statistics result_summary += f"πŸ“Š **Processing Summary**:\n" result_summary += f" πŸ“ Directory: {directory_path}\n" result_summary += f" πŸ” Pattern: {file_pattern} (recursive: {recursive})\n" result_summary += f" πŸ“„ Files found: {len(files)}\n" result_summary += f" βœ… Successfully indexed: {len(successful)}\n" result_summary += f" ❌ Failed: {len(failed)}\n" if skipped_existing: result_summary += f" ⏭️ Skipped (already exist): {len(skipped_existing)}\n" if skipped_size: result_summary += f" πŸ“ Skipped (too large): {len(skipped_size)}\n" result_summary += f" 🎯 Index: {index}\n" # AI Enhancement info if use_ai_enhancement and ctx: result_summary += f" πŸ€– AI Enhancement: Enabled (generated intelligent tags and key points)\n" else: result_summary += f" πŸ€– AI Enhancement: Disabled (using basic metadata)\n" result_summary += "\n" # Successful indexing details if successful: result_summary += f"βœ… **Successfully Indexed** ({len(successful)} files):\n" for file_name, doc_id, index_result in successful[:10]: # Show first 10 result_summary += f" πŸ“„ {file_name} β†’ {doc_id} ({index_result})\n" if len(successful) > 10: result_summary += f" ... and {len(successful) - 10} more files\n" result_summary += "\n" # Failed indexing details if failed: result_summary += f"❌ **Failed to Index** ({len(failed)} files):\n" for file_name, error_msg in failed[:5]: # Show first 5 errors result_summary += f" πŸ“„ {file_name}: {error_msg}\n" if len(failed) > 5: result_summary += f" ... and {len(failed) - 5} more errors\n" result_summary += "\n" # Skipped files details if skipped_existing: result_summary += f"⏭️ **Skipped (Already Exist)** ({len(skipped_existing)} files):\n" for file_name in skipped_existing[:5]: result_summary += f" πŸ“„ {file_name}\n" if len(skipped_existing) > 5: result_summary += f" ... and {len(skipped_existing) - 5} more files\n" result_summary += "\n" if skipped_size: result_summary += f"πŸ“ **Skipped (Too Large)** ({len(skipped_size)} files):\n" for file_path, file_size in skipped_size[:3]: size_mb = file_size / 1048576 result_summary += f" πŸ“„ {file_path.name}: {size_mb:.1f} MB\n" if len(skipped_size) > 3: result_summary += f" ... and {len(skipped_size) - 3} more large files\n" result_summary += f" πŸ’‘ Increase max_file_size to include these files\n\n" # Performance tips if len(successful) > 0: result_summary += f"πŸš€ **Performance Tips for Future Batches**:\n" result_summary += f" πŸ”„ Use skip_existing=True to avoid reindexing\n" result_summary += f" πŸ“‚ Process subdirectories separately for better control\n" result_summary += f" πŸ” Use specific file patterns (*.md, *.txt) for faster processing\n" result_summary += f" πŸ“ Adjust max_file_size based on your content needs\n" if use_ai_enhancement: result_summary += f" πŸ€– AI enhancement adds ~2-3 seconds per file but greatly improves metadata quality\n" result_summary += f" ⚑ Set use_ai_enhancement=False for faster processing with basic metadata\n" else: result_summary += f" πŸ€– Enable use_ai_enhancement=True for intelligent tags and key points\n" result_summary += "\n" # Knowledge base recommendations if len(successful) > 20: result_summary += f"🧹 **Knowledge Base Organization Recommendation**:\n" result_summary += f" πŸ“Š You've indexed {len(successful)} documents from this batch\n" result_summary += f" πŸ’‘ Consider organizing them by topics or themes\n" result_summary += f" πŸ” Use the 'search' tool to find related documents for consolidation\n" result_summary += f" 🎯 Group similar content to improve knowledge base quality\n" return result_summary except Exception as e: error_message = "❌ Batch indexing failed:\n\n" error_str = str(e).lower() if "connection" in error_str or "refused" in error_str: error_message += "πŸ”Œ **Connection Error**: Cannot connect to Elasticsearch server\n" error_message += f"πŸ“ Check if Elasticsearch is running at the configured address\n" error_message += f"πŸ’‘ Try: Use 'setup_elasticsearch' tool to start Elasticsearch\n\n" elif ("index" in error_str and "not found" in error_str) or "index_not_found_exception" in error_str: error_message += f"πŸ“ **Index Error**: Index '{index}' does not exist\n" error_message += f"πŸ“ The target index has not been created yet\n" error_message += f"πŸ’‘ Try: Use 'create_index' tool to create the index first\n\n" elif "permission" in error_str or "access denied" in error_str: error_message += f"πŸ”’ **Permission Error**: Access denied to directory or files\n" error_message += f"πŸ“ Insufficient permissions to read directory or files\n" error_message += f"πŸ’‘ Try: Check directory permissions or verify file access rights\n\n" else: error_message += f"⚠️ **Unknown Error**: {str(e)}\n\n" error_message += f"πŸ” **Technical Details**: {str(e)}" return error_message
  • Input schema defined using Pydantic Annotated[Field] for all parameters including descriptions, defaults, and constraints.
    async def batch_index_directory( index: Annotated[str, Field(description="Name of the Elasticsearch index to store the documents")], directory_path: Annotated[str, Field(description="Path to directory containing documents to index")], file_pattern: Annotated[str, Field(description="File pattern to match (e.g., '*.md', '*.txt', '*')")] = "*.md", validate_schema: Annotated[ bool, Field(description="Whether to validate document structure for knowledge base format")] = True, recursive: Annotated[bool, Field(description="Whether to search subdirectories recursively")] = True, skip_existing: Annotated[ bool, Field(description="Skip files that already exist in index (check by filename)")] = False, max_file_size: Annotated[ int, Field(description="Maximum file size in bytes to process", ge=1, le=10485760)] = 1048576, # 1MB default use_ai_enhancement: Annotated[ bool, Field(description="Use AI to generate intelligent tags and key points for each document")] = True, ctx: Context = None ) -> str:
  • Registration via import and app.mount() of the batch sub-server into the main unified Elasticsearch FastMCP server, exposing batch_index_directory as part of the 16-tool interface.
    from .sub_servers.elasticsearch_batch import app as batch_app # Create unified FastMCP application app = FastMCP( name="AgentKnowledgeMCP-Elasticsearch", version="2.0.0", instructions="Unified Elasticsearch tools for comprehensive knowledge management via modular server mounting" ) # ================================ # SERVER MOUNTING - MODULAR ARCHITECTURE # ================================ print("πŸ—οΈ Mounting Elasticsearch sub-servers...") # Mount all sub-servers into unified interface app.mount(snapshots_app) # 3 tools: snapshot management app.mount(index_metadata_app) # 3 tools: metadata governance app.mount(document_app) # 3 tools: document operations app.mount(index_app) # 3 tools: index management app.mount(search_app) # 2 tools: search & validation app.mount(batch_app) # 2 tools: batch operations
  • Local tool registration on the elasticsearch_batch FastMCP app using @app.tool decorator.
    @app.tool( description="Batch index all documents from a directory into Elasticsearch with AI-enhanced metadata generation and comprehensive file processing", tags={"elasticsearch", "batch", "directory", "index", "bulk", "ai-enhanced"} )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/itshare4u/AgentKnowledgeMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server