Skip to main content
Glama
itshare4u

Agent Knowledge MCP

batch_index_directory

Process documents from a directory into Elasticsearch with AI-enhanced metadata generation, supporting batch indexing and comprehensive file handling.

Instructions

Batch index all documents from a directory into Elasticsearch with AI-enhanced metadata generation and comprehensive file processing

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
indexYesName of the Elasticsearch index to store the documents
directory_pathYesPath to directory containing documents to index
file_patternNoFile pattern to match (e.g., '*.md', '*.txt', '*')*.md
validate_schemaNoWhether to validate document structure for knowledge base format
recursiveNoWhether to search subdirectories recursively
skip_existingNoSkip files that already exist in index (check by filename)
max_file_sizeNoMaximum file size in bytes to process
use_ai_enhancementNoUse AI to generate intelligent tags and key points for each document

Implementation Reference

  • Complete handler implementation for batch_index_directory tool: directory traversal, file filtering/reading, AI metadata generation, document validation, Elasticsearch indexing with comprehensive error handling and reporting.
    @app.tool(
        description="Batch index all documents from a directory into Elasticsearch with AI-enhanced metadata generation and comprehensive file processing",
        tags={"elasticsearch", "batch", "directory", "index", "bulk", "ai-enhanced"}
    )
    async def batch_index_directory(
            index: Annotated[str, Field(description="Name of the Elasticsearch index to store the documents")],
            directory_path: Annotated[str, Field(description="Path to directory containing documents to index")],
            file_pattern: Annotated[str, Field(description="File pattern to match (e.g., '*.md', '*.txt', '*')")] = "*.md",
            validate_schema: Annotated[
                bool, Field(description="Whether to validate document structure for knowledge base format")] = True,
            recursive: Annotated[bool, Field(description="Whether to search subdirectories recursively")] = True,
            skip_existing: Annotated[
                bool, Field(description="Skip files that already exist in index (check by filename)")] = False,
            max_file_size: Annotated[
                int, Field(description="Maximum file size in bytes to process", ge=1, le=10485760)] = 1048576,
            # 1MB default
            use_ai_enhancement: Annotated[
                bool, Field(description="Use AI to generate intelligent tags and key points for each document")] = True,
            ctx: Context = None
    ) -> str:
        """Batch index all documents from a directory into Elasticsearch."""
        try:
            from pathlib import Path
            import os
    
            # Check directory exists and is valid
            directory = Path(directory_path)
            if not directory.exists():
                return f"❌ Directory not found: {directory_path}\nπŸ’‘ Check the directory path spelling and location"
    
            if not directory.is_dir():
                return f"❌ Path is not a directory: {directory_path}\nπŸ’‘ Provide a directory path, not a file path"
    
            # Get Elasticsearch client
            es = get_es_client()
    
            # Find all matching files
            if recursive:
                files = list(directory.rglob(file_pattern))
            else:
                files = list(directory.glob(file_pattern))
    
            if not files:
                return f"❌ No files found matching pattern '{file_pattern}' in directory: {directory_path}\nπŸ’‘ Try a different file pattern like '*.txt', '*.json', or '*'"
    
            # Filter out files that are too large
            valid_files = []
            skipped_size = []
            for file_path in files:
                if file_path.is_file():
                    try:
                        file_size = file_path.stat().st_size
                        if file_size <= max_file_size:
                            valid_files.append(file_path)
                        else:
                            skipped_size.append((file_path, file_size))
                    except Exception as e:
                        # Skip files we can't stat
                        continue
    
            if not valid_files:
                return f"❌ No valid files found (all files too large or inaccessible)\nπŸ’‘ Increase max_file_size or check file permissions"
    
            # Check for existing documents if skip_existing is True
            existing_docs = set()
            if skip_existing:
                try:
                    # Search for existing documents by titles
                    search_body = {
                        "query": {"match_all": {}},
                        "size": 10000,  # Get many docs to check
                        "_source": ["title", "id"]
                    }
                    existing_result = es.search(index=index, body=search_body)
                    for hit in existing_result['hits']['hits']:
                        source = hit.get('_source', {})
                        if 'title' in source:
                            existing_docs.add(source['title'])
                        if 'id' in source:
                            existing_docs.add(source['id'])
                except Exception:
                    # If we can't check existing docs, proceed anyway
                    pass
    
            # Process files
            successful = []
            failed = []
            skipped_existing = []
    
            for file_path in valid_files:
                try:
                    file_name = file_path.name
                    # Handle files with multiple dots properly (e.g., .post.md, .get.md)
                    clean_stem = file_path.name
                    if file_path.suffix:
                        clean_stem = file_path.name[:-len(file_path.suffix)]
                    title = clean_stem.replace('_', ' ').replace('-', ' ').replace('.', ' ').title()
    
                    # Skip if document with same title already exists in index
                    if skip_existing and title in existing_docs:
                        skipped_existing.append(file_name)
                        continue
    
                    # Read file content
                    try:
                        with open(file_path, 'r', encoding='utf-8') as f:
                            content = f.read()
                    except UnicodeDecodeError:
                        # Try with different encodings
                        try:
                            with open(file_path, 'r', encoding='latin-1') as f:
                                content = f.read()
                        except Exception as e:
                            failed.append((file_name, f"Encoding error: {str(e)}"))
                            continue
                    except Exception as e:
                        failed.append((file_name, f"Read error: {str(e)}"))
                        continue
    
                    # Create document from file
                    relative_path = file_path.relative_to(directory)
                    # Handle files with multiple dots (e.g., .post.md, .get.md) by using the full name without final extension
                    clean_stem = file_path.name
                    if file_path.suffix:
                        clean_stem = file_path.name[:-len(file_path.suffix)]
                    doc_id = f"{clean_stem.replace('.', '_')}_{hash(str(relative_path)) % 100000}"  # Create unique ID
    
                    # Initialize basic tags and key points
                    base_tags = [
                        "batch-indexed",
                        file_path.suffix[1:] if file_path.suffix else "no-extension",
                        directory.name
                    ]
    
                    base_key_points = [
                        f"Content length: {len(content)} characters",
                        f"Source directory: {directory.name}"
                    ]
    
                    final_tags = base_tags.copy()
                    final_key_points = base_key_points.copy()
                    final_summary = f"Document from {file_name}"
    
                    # Use AI enhancement if requested and context is available
                    if use_ai_enhancement and ctx and content.strip():
                        try:
                            await ctx.info(f"πŸ€– Generating AI metadata and smart content for: {file_name}")
                            ai_metadata = await generate_smart_metadata(title, content, ctx)
    
                            # Merge AI-generated tags with base tags
                            ai_tags = ai_metadata.get("tags", [])
                            for tag in ai_tags:
                                if tag not in final_tags:
                                    final_tags.append(tag)
    
                            # Merge AI-generated key points with base points
                            ai_key_points = ai_metadata.get("key_points", [])
                            for point in ai_key_points:
                                if point not in final_key_points:
                                    final_key_points.append(point)
    
                            # Use AI-generated smart summary and enhanced content
                            ai_summary = ai_metadata.get("smart_summary", "")
                            ai_enhanced_content = ai_metadata.get("enhanced_content", "")
    
                            if ai_summary:
                                final_summary = ai_summary
                            elif len(content) > 100:
                                # Fallback to content preview if no AI summary
                                content_preview = content[:300].strip()
                                if content_preview:
                                    final_summary = content_preview + ("..." if len(content) > 300 else "")
    
                            # Use enhanced content if available and substantially different
                            if ai_enhanced_content and len(ai_enhanced_content) > len(content) * 0.8:
                                content = ai_enhanced_content
    
                        except Exception as e:
                            await ctx.warning(f"AI enhancement failed for {file_name}: {str(e)}")
    
                    document = {
                        "id": doc_id,
                        "title": title,
                        "summary": final_summary,
                        "content": content,
                        "last_modified": datetime.now().isoformat(),
                        "priority": "medium",
                        "tags": final_tags,
                        "related": [],
                        "source_type": "documentation",
                        "key_points": final_key_points
                    }
    
                    # Validate document if requested
                    if validate_schema:
                        try:
                            validated_doc = validate_document_structure(document)
                            document = validated_doc
                        except DocumentValidationError as e:
                            failed.append((file_name, f"Validation error: {str(e)}"))
                            continue
                        except Exception as e:
                            failed.append((file_name, f"Validation error: {str(e)}"))
                            continue
    
                    # Index the document
                    try:
                        result = es.index(index=index, id=doc_id, body=document)
                        successful.append((file_name, doc_id, result.get('result', 'unknown')))
                    except Exception as e:
                        failed.append((file_name, f"Indexing error: {str(e)}"))
                        continue
    
                except Exception as e:
                    failed.append((file_path.name, f"Processing error: {str(e)}"))
                    continue
    
            # Build result summary
            total_processed = len(successful) + len(failed) + len(skipped_existing)
            result_summary = f"βœ… Batch indexing completed for directory: {directory_path}\n\n"
    
            # Summary statistics
            result_summary += f"πŸ“Š **Processing Summary**:\n"
            result_summary += f"   πŸ“ Directory: {directory_path}\n"
            result_summary += f"   πŸ” Pattern: {file_pattern} (recursive: {recursive})\n"
            result_summary += f"   πŸ“„ Files found: {len(files)}\n"
            result_summary += f"   βœ… Successfully indexed: {len(successful)}\n"
            result_summary += f"   ❌ Failed: {len(failed)}\n"
    
            if skipped_existing:
                result_summary += f"   ⏭️ Skipped (already exist): {len(skipped_existing)}\n"
    
            if skipped_size:
                result_summary += f"   πŸ“ Skipped (too large): {len(skipped_size)}\n"
    
            result_summary += f"   🎯 Index: {index}\n"
    
            # AI Enhancement info
            if use_ai_enhancement and ctx:
                result_summary += f"   πŸ€– AI Enhancement: Enabled (generated intelligent tags and key points)\n"
            else:
                result_summary += f"   πŸ€– AI Enhancement: Disabled (using basic metadata)\n"
    
            result_summary += "\n"
    
            # Successful indexing details
            if successful:
                result_summary += f"βœ… **Successfully Indexed** ({len(successful)} files):\n"
                for file_name, doc_id, index_result in successful[:10]:  # Show first 10
                    result_summary += f"   πŸ“„ {file_name} β†’ {doc_id} ({index_result})\n"
                if len(successful) > 10:
                    result_summary += f"   ... and {len(successful) - 10} more files\n"
                result_summary += "\n"
    
            # Failed indexing details
            if failed:
                result_summary += f"❌ **Failed to Index** ({len(failed)} files):\n"
                for file_name, error_msg in failed[:5]:  # Show first 5 errors
                    result_summary += f"   πŸ“„ {file_name}: {error_msg}\n"
                if len(failed) > 5:
                    result_summary += f"   ... and {len(failed) - 5} more errors\n"
                result_summary += "\n"
    
            # Skipped files details
            if skipped_existing:
                result_summary += f"⏭️ **Skipped (Already Exist)** ({len(skipped_existing)} files):\n"
                for file_name in skipped_existing[:5]:
                    result_summary += f"   πŸ“„ {file_name}\n"
                if len(skipped_existing) > 5:
                    result_summary += f"   ... and {len(skipped_existing) - 5} more files\n"
                result_summary += "\n"
    
            if skipped_size:
                result_summary += f"πŸ“ **Skipped (Too Large)** ({len(skipped_size)} files):\n"
                for file_path, file_size in skipped_size[:3]:
                    size_mb = file_size / 1048576
                    result_summary += f"   πŸ“„ {file_path.name}: {size_mb:.1f} MB\n"
                if len(skipped_size) > 3:
                    result_summary += f"   ... and {len(skipped_size) - 3} more large files\n"
                result_summary += f"   πŸ’‘ Increase max_file_size to include these files\n\n"
    
            # Performance tips
            if len(successful) > 0:
                result_summary += f"πŸš€ **Performance Tips for Future Batches**:\n"
                result_summary += f"   πŸ”„ Use skip_existing=True to avoid reindexing\n"
                result_summary += f"   πŸ“‚ Process subdirectories separately for better control\n"
                result_summary += f"   πŸ” Use specific file patterns (*.md, *.txt) for faster processing\n"
                result_summary += f"   πŸ“ Adjust max_file_size based on your content needs\n"
                if use_ai_enhancement:
                    result_summary += f"   πŸ€– AI enhancement adds ~2-3 seconds per file but greatly improves metadata quality\n"
                    result_summary += f"   ⚑ Set use_ai_enhancement=False for faster processing with basic metadata\n"
                else:
                    result_summary += f"   πŸ€– Enable use_ai_enhancement=True for intelligent tags and key points\n"
                result_summary += "\n"
    
            # Knowledge base recommendations
            if len(successful) > 20:
                result_summary += f"🧹 **Knowledge Base Organization Recommendation**:\n"
                result_summary += f"   πŸ“Š You've indexed {len(successful)} documents from this batch\n"
                result_summary += f"   πŸ’‘ Consider organizing them by topics or themes\n"
                result_summary += f"   πŸ” Use the 'search' tool to find related documents for consolidation\n"
                result_summary += f"   🎯 Group similar content to improve knowledge base quality\n"
    
            return result_summary
    
        except Exception as e:
            error_message = "❌ Batch indexing failed:\n\n"
    
            error_str = str(e).lower()
            if "connection" in error_str or "refused" in error_str:
                error_message += "πŸ”Œ **Connection Error**: Cannot connect to Elasticsearch server\n"
                error_message += f"πŸ“ Check if Elasticsearch is running at the configured address\n"
                error_message += f"πŸ’‘ Try: Use 'setup_elasticsearch' tool to start Elasticsearch\n\n"
            elif ("index" in error_str and "not found" in error_str) or "index_not_found_exception" in error_str:
                error_message += f"πŸ“ **Index Error**: Index '{index}' does not exist\n"
                error_message += f"πŸ“ The target index has not been created yet\n"
                error_message += f"πŸ’‘ Try: Use 'create_index' tool to create the index first\n\n"
            elif "permission" in error_str or "access denied" in error_str:
                error_message += f"πŸ”’ **Permission Error**: Access denied to directory or files\n"
                error_message += f"πŸ“ Insufficient permissions to read directory or files\n"
                error_message += f"πŸ’‘ Try: Check directory permissions or verify file access rights\n\n"
            else:
                error_message += f"⚠️ **Unknown Error**: {str(e)}\n\n"
    
            error_message += f"πŸ” **Technical Details**: {str(e)}"
            return error_message
  • Input schema defined using Pydantic Annotated[Field] for all parameters including descriptions, defaults, and constraints.
    async def batch_index_directory(
            index: Annotated[str, Field(description="Name of the Elasticsearch index to store the documents")],
            directory_path: Annotated[str, Field(description="Path to directory containing documents to index")],
            file_pattern: Annotated[str, Field(description="File pattern to match (e.g., '*.md', '*.txt', '*')")] = "*.md",
            validate_schema: Annotated[
                bool, Field(description="Whether to validate document structure for knowledge base format")] = True,
            recursive: Annotated[bool, Field(description="Whether to search subdirectories recursively")] = True,
            skip_existing: Annotated[
                bool, Field(description="Skip files that already exist in index (check by filename)")] = False,
            max_file_size: Annotated[
                int, Field(description="Maximum file size in bytes to process", ge=1, le=10485760)] = 1048576,
            # 1MB default
            use_ai_enhancement: Annotated[
                bool, Field(description="Use AI to generate intelligent tags and key points for each document")] = True,
            ctx: Context = None
    ) -> str:
  • Registration via import and app.mount() of the batch sub-server into the main unified Elasticsearch FastMCP server, exposing batch_index_directory as part of the 16-tool interface.
    from .sub_servers.elasticsearch_batch import app as batch_app
    
    # Create unified FastMCP application
    app = FastMCP(
        name="AgentKnowledgeMCP-Elasticsearch",
        version="2.0.0",
        instructions="Unified Elasticsearch tools for comprehensive knowledge management via modular server mounting"
    )
    
    # ================================
    # SERVER MOUNTING - MODULAR ARCHITECTURE
    # ================================
    
    print("πŸ—οΈ Mounting Elasticsearch sub-servers...")
    
    # Mount all sub-servers into unified interface
    app.mount(snapshots_app)           # 3 tools: snapshot management
    app.mount(index_metadata_app)      # 3 tools: metadata governance  
    app.mount(document_app)            # 3 tools: document operations
    app.mount(index_app)               # 3 tools: index management
    app.mount(search_app)              # 2 tools: search & validation
    app.mount(batch_app)               # 2 tools: batch operations
  • Local tool registration on the elasticsearch_batch FastMCP app using @app.tool decorator.
    @app.tool(
        description="Batch index all documents from a directory into Elasticsearch with AI-enhanced metadata generation and comprehensive file processing",
        tags={"elasticsearch", "batch", "directory", "index", "bulk", "ai-enhanced"}
    )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/itshare4u/AgentKnowledgeMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server