create_snapshot
Generate a snapshot of Elasticsearch indices with customizable options, including repository selection, index inclusion, and global state management, for comprehensive backup and recovery.
Instructions
Create a snapshot (backup) of Elasticsearch indices with comprehensive options and repository management
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| description | No | Optional description for the snapshot | |
| ignore_unavailable | No | Whether to ignore unavailable indices | |
| include_global_state | No | Whether to include cluster global state | |
| indices | No | Comma-separated list of indices to backup (default: all indices) | |
| repository | No | Repository name to store the snapshot | backup_repository |
| snapshot_name | Yes | Name for the snapshot (must be unique) | |
| wait_for_completion | No | Whether to wait for snapshot completion |
Implementation Reference
- Primary handler implementation for the 'create_snapshot' MCP tool. Includes input schema via Annotated Pydantic Fields, full execution logic for snapshot creation (with auto-repo creation), comprehensive error handling with user-friendly messages, and detailed success reporting.@app.tool( description="Create a snapshot (backup) of Elasticsearch indices with comprehensive options and repository management", tags={"elasticsearch", "snapshot", "backup", "repository"} ) async def create_snapshot( snapshot_name: Annotated[str, Field(description="Name for the snapshot (must be unique)")], repository: Annotated[str, Field(description="Repository name to store the snapshot")] = "backup_repository", indices: Annotated[Optional[str], Field( description="Comma-separated list of indices to backup (default: all indices)")] = None, ignore_unavailable: Annotated[bool, Field(description="Whether to ignore unavailable indices")] = True, include_global_state: Annotated[bool, Field(description="Whether to include cluster global state")] = True, wait_for_completion: Annotated[bool, Field(description="Whether to wait for snapshot completion")] = True, description: Annotated[Optional[str], Field(description="Optional description for the snapshot")] = None ) -> str: """Create a snapshot (backup) of Elasticsearch indices.""" try: es = get_es_client() # Check if repository exists, create if not try: repo_info = es.snapshot.get_repository(repository=repository) except: # Repository doesn't exist, create default file system repository repo_body = { "type": "fs", "settings": { "location": f"/usr/share/elasticsearch/snapshots/{repository}", "compress": True } } try: es.snapshot.create_repository(repository=repository, body=repo_body) repo_created = True except Exception as repo_error: return (f"β Failed to create snapshot repository:\n\n" + f"π§ **Repository Error**: Cannot create repository '{repository}'\n" + f"π **Issue**: {str(repo_error)}\n\n" + f"π‘ **Common Solutions**:\n" + f" 1. Ensure Elasticsearch has write permissions to snapshot directory\n" + f" 2. Add 'path.repo: [\"/usr/share/elasticsearch/snapshots\"]' to elasticsearch.yml\n" + f" 3. Restart Elasticsearch after configuration change\n" + f" 4. Or use existing repository name\n\n" + f"π **Technical Details**: {str(repo_error)}") else: repo_created = False # Parse indices parameter if indices: indices_list = [idx.strip() for idx in indices.split(',')] indices_param = ','.join(indices_list) else: indices_param = "*" # All indices indices_list = ["*"] # Create snapshot metadata snapshot_body = { "indices": indices_param, "ignore_unavailable": ignore_unavailable, "include_global_state": include_global_state } if description: snapshot_body["metadata"] = { "description": description, "created_by": "AgentKnowledgeMCP", "created_at": datetime.now().isoformat() } # Create the snapshot snapshot_result = es.snapshot.create( repository=repository, snapshot=snapshot_name, body=snapshot_body, wait_for_completion=wait_for_completion ) # Format response based on completion if wait_for_completion: snapshot_info = snapshot_result.get('snapshot', {}) state = snapshot_info.get('state', 'UNKNOWN') if state == 'SUCCESS': status_emoji = "β " status_msg = "Successfully completed" elif state == 'PARTIAL': status_emoji = "β οΈ" status_msg = "Partially completed with some issues" elif state == 'FAILED': status_emoji = "β" status_msg = "Failed to complete" else: status_emoji = "π" status_msg = f"Status: {state}" result_message = (f"{status_emoji} Snapshot '{snapshot_name}' {status_msg}!\n\n" + f"πΈ **Snapshot Details**:\n" + f" π Repository: {repository}\n" + f" π Name: {snapshot_name}\n" + f" π State: {state}\n" + f" π¦ Indices: {', '.join(indices_list)}\n" + f" π Global State: {'Included' if include_global_state else 'Excluded'}\n") if snapshot_info.get('shards'): shards = snapshot_info['shards'] result_message += (f" π’ Shards: {shards.get('total', 0)} total, " + f"{shards.get('successful', 0)} successful, " + f"{shards.get('failed', 0)} failed\n") if snapshot_info.get('start_time_in_millis') and snapshot_info.get('end_time_in_millis'): duration = (snapshot_info['end_time_in_millis'] - snapshot_info['start_time_in_millis']) / 1000 result_message += f" β±οΈ Duration: {duration:.2f} seconds\n" if description: result_message += f" π Description: {description}\n" else: result_message = (f"π Snapshot '{snapshot_name}' started!\n\n" + f"πΈ **Snapshot Details**:\n" + f" π Repository: {repository}\n" + f" π Name: {snapshot_name}\n" + f" π Status: Running in background\n" + f" π¦ Indices: {', '.join(indices_list)}\n" + f" π Global State: {'Included' if include_global_state else 'Excluded'}\n") if repo_created: result_message += f"\nπ **Repository Created**: Created new repository '{repository}'\n" result_message += (f"\nβ **Success Actions**:\n" + f" πΈ Snapshot backup is {'completed' if wait_for_completion else 'in progress'}\n" + f" π Use 'list_snapshots' to view all snapshots\n" + f" π Use 'restore_snapshot' to restore from this backup\n" + f" π Check snapshot status with repository '{repository}'\n\n" + f"πΎ **Backup Strategy**:\n" + f" π Regular snapshots help protect against data loss\n" + f" π·οΈ Use descriptive snapshot names with dates\n" + f" π Monitor repository storage space\n" + f" π§Ή Clean up old snapshots periodically") return result_message except Exception as e: error_message = "β Failed to create snapshot:\n\n" error_str = str(e).lower() if "connection" in error_str or "refused" in error_str: error_message += "π **Connection Error**: Cannot connect to Elasticsearch server\n" error_message += f"π Check if Elasticsearch is running at the configured address\n" error_message += f"π‘ Try: Use 'setup_elasticsearch' tool to start Elasticsearch\n\n" elif "repository" in error_str and ("not found" in error_str or "missing" in error_str): error_message += f"π **Repository Error**: Repository '{repository}' not found or misconfigured\n" error_message += f"π Check repository configuration and permissions\n" error_message += f"π‘ Try: Use different repository name or check path.repo settings\n\n" elif "invalid_snapshot_name" in error_str: error_message += f"π·οΈ **Naming Error**: Invalid snapshot name '{snapshot_name}'\n" error_message += f"π Snapshot names must be lowercase and cannot contain certain characters\n" error_message += f"π‘ Try: Use alphanumeric characters and hyphens only\n\n" elif "already_exists" in error_str: error_message += f"π **Conflict Error**: Snapshot '{snapshot_name}' already exists\n" error_message += f"π Each snapshot must have a unique name within the repository\n" error_message += f"π‘ Try: Use different snapshot name or delete existing snapshot\n\n" else: error_message += f"β οΈ **Unknown Error**: {str(e)}\n\n" error_message += f"π **Technical Details**: {str(e)}" return error_message
- src/elasticsearch/elasticsearch_server.py:25-46 (registration)Registration/mounting of the elasticsearch_snapshots sub-server into the main unified Elasticsearch FastMCP app, exposing the 'create_snapshot' tool (along with restore_snapshot and list_snapshots).from .sub_servers.elasticsearch_snapshots import app as snapshots_app from .sub_servers.elasticsearch_index_metadata import app as index_metadata_app from .sub_servers.elasticsearch_document import app as document_app from .sub_servers.elasticsearch_index import app as index_app from .sub_servers.elasticsearch_search import app as search_app from .sub_servers.elasticsearch_batch import app as batch_app # Create unified FastMCP application app = FastMCP( name="AgentKnowledgeMCP-Elasticsearch", version="2.0.0", instructions="Unified Elasticsearch tools for comprehensive knowledge management via modular server mounting" ) # ================================ # SERVER MOUNTING - MODULAR ARCHITECTURE # ================================ print("ποΈ Mounting Elasticsearch sub-servers...") # Mount all sub-servers into unified interface app.mount(snapshots_app) # 3 tools: snapshot management
- Input schema definition using Pydantic Annotated types and Field descriptions for the create_snapshot tool parameters.async def create_snapshot( snapshot_name: Annotated[str, Field(description="Name for the snapshot (must be unique)")], repository: Annotated[str, Field(description="Repository name to store the snapshot")] = "backup_repository", indices: Annotated[Optional[str], Field( description="Comma-separated list of indices to backup (default: all indices)")] = None, ignore_unavailable: Annotated[bool, Field(description="Whether to ignore unavailable indices")] = True, include_global_state: Annotated[bool, Field(description="Whether to include cluster global state")] = True, wait_for_completion: Annotated[bool, Field(description="Whether to wait for snapshot completion")] = True, description: Annotated[Optional[str], Field(description="Optional description for the snapshot")] = None ) -> str:
- Local registration of the create_snapshot tool on the sub-server FastMCP app instance.@app.tool( description="Create a snapshot (backup) of Elasticsearch indices with comprehensive options and repository management", tags={"elasticsearch", "snapshot", "backup", "repository"} )