restore_snapshot
Restore Elasticsearch indices from snapshots with configurable options for selective recovery, conflict resolution, and index customization.
Instructions
Restore indices from an Elasticsearch snapshot with comprehensive options and conflict resolution
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| snapshot_name | Yes | Name of the snapshot to restore from | |
| repository | No | Repository containing the snapshot | backup_repository |
| indices | No | Comma-separated list of indices to restore (default: all from snapshot) | |
| ignore_unavailable | No | Whether to ignore unavailable indices | |
| include_global_state | No | Whether to restore cluster global state | |
| wait_for_completion | No | Whether to wait for restore completion | |
| rename_pattern | No | Pattern to rename restored indices (e.g., 'restored_%s') | |
| index_settings | No | JSON string of index settings to override |
Implementation Reference
- Full implementation of the 'restore_snapshot' tool handler. Includes @app.tool decorator for registration and schema definition via Annotated[Field] parameters. Executes Elasticsearch snapshot restore with validation, conflict detection, and detailed reporting.@app.tool( description="Restore indices from an Elasticsearch snapshot with comprehensive options and conflict resolution", tags={"elasticsearch", "snapshot", "restore", "rollback"} ) async def restore_snapshot( snapshot_name: Annotated[str, Field(description="Name of the snapshot to restore from")], repository: Annotated[str, Field(description="Repository containing the snapshot")] = "backup_repository", indices: Annotated[Optional[str], Field( description="Comma-separated list of indices to restore (default: all from snapshot)")] = None, ignore_unavailable: Annotated[bool, Field(description="Whether to ignore unavailable indices")] = True, include_global_state: Annotated[bool, Field(description="Whether to restore cluster global state")] = False, wait_for_completion: Annotated[bool, Field(description="Whether to wait for restore completion")] = True, rename_pattern: Annotated[ Optional[str], Field(description="Pattern to rename restored indices (e.g., 'restored_%s')")] = None, index_settings: Annotated[Optional[str], Field(description="JSON string of index settings to override")] = None ) -> str: """Restore indices from an Elasticsearch snapshot.""" try: es = get_es_client() # Verify repository exists try: repo_info = es.snapshot.get_repository(repository=repository) except: return (f"โ Repository '{repository}' not found!\n\n" + f"๐ **Repository Error**: Cannot access snapshot repository\n" + f"๐ **Available Actions**:\n" + f" 1. Check repository name spelling\n" + f" 2. Use 'create_snapshot' to create repository first\n" + f" 3. Verify Elasticsearch path.repo configuration\n\n" + f"๐ก **Tip**: Repositories must be configured before accessing snapshots") # Verify snapshot exists try: snapshot_info = es.snapshot.get(repository=repository, snapshot=snapshot_name) except: return (f"โ Snapshot '{snapshot_name}' not found in repository '{repository}'!\n\n" + f"๐ธ **Snapshot Error**: Cannot find the specified snapshot\n" + f"๐ **Possible Issues**:\n" + f" 1. Snapshot name is incorrect\n" + f" 2. Snapshot was deleted or corrupted\n" + f" 3. Repository path has changed\n\n" + f"๐ **Next Steps**:\n" + f" โข Use 'list_snapshots' to see available snapshots\n" + f" โข Check repository configuration and permissions\n" + f" โข Verify backup storage accessibility") # Parse indices parameter if indices: indices_list = [idx.strip() for idx in indices.split(',')] indices_param = ','.join(indices_list) else: indices_param = None # Restore all indices from snapshot indices_list = ["all"] # Build restore body restore_body = { "ignore_unavailable": ignore_unavailable, "include_global_state": include_global_state } if indices_param: restore_body["indices"] = indices_param if rename_pattern: restore_body["rename_pattern"] = rename_pattern restore_body["rename_replacement"] = rename_pattern if index_settings: try: settings_dict = json.loads(index_settings) restore_body["index_settings"] = settings_dict except json.JSONDecodeError: return (f"โ Invalid JSON in index_settings parameter!\n\n" + f"๐ **JSON Error**: Cannot parse index settings\n" + f"๐ **Provided**: {index_settings}\n" + f"๐ก **Example**: '{{\"number_of_replicas\": 0, \"refresh_interval\": \"30s\"}}'") # Check for potential conflicts (existing indices) conflicts = [] if indices_list and indices_list != ["all"]: for index_name in indices_list: if rename_pattern: # If renaming, check the new name new_name = rename_pattern.replace('%s', index_name) try: es.indices.get(index=new_name) conflicts.append(f"{index_name} -> {new_name}") except: pass # Index doesn't exist, no conflict else: # Direct restore, check original name try: es.indices.get(index=index_name) conflicts.append(index_name) except: pass # Index doesn't exist, no conflict # Warn about conflicts conflict_warning = "" if conflicts and not rename_pattern: conflict_warning = (f"\nโ ๏ธ **Warning - Existing Indices Will Be Overwritten**:\n" + f" ๐ Conflicting indices: {', '.join(conflicts)}\n" + f" ๐ These indices will be closed and replaced\n" + f" ๐ก Consider using rename_pattern to avoid conflicts\n\n") # Execute restore restore_result = es.snapshot.restore( repository=repository, snapshot=snapshot_name, body=restore_body, wait_for_completion=wait_for_completion ) # Get snapshot details for reporting snapshot_details = snapshot_info['snapshots'][0] if snapshot_info.get('snapshots') else {} snapshot_state = snapshot_details.get('state', 'UNKNOWN') # Format response based on completion if wait_for_completion: restore_info = restore_result.get('snapshot', {}) shards_info = restore_info.get('shards', {}) result_message = (f"โ Snapshot '{snapshot_name}' restored successfully!\n\n" + f"๐ **Restore Details**:\n" + f" ๐ Repository: {repository}\n" + f" ๐ธ Snapshot: {snapshot_name}\n" + f" ๐ Snapshot State: {snapshot_state}\n" + f" ๐ฆ Restored Indices: {', '.join(indices_list)}\n" + f" ๐ Global State: {'Restored' if include_global_state else 'Skipped'}\n") if rename_pattern: result_message += f" ๐ท๏ธ Rename Pattern: {rename_pattern}\n" if shards_info: result_message += (f" ๐ข Shards: {shards_info.get('total', 0)} total, " + f"{shards_info.get('successful', 0)} successful, " + f"{shards_info.get('failed', 0)} failed\n") else: result_message = (f"๐ Snapshot restore started!\n\n" + f"๐ **Restore Details**:\n" + f" ๐ Repository: {repository}\n" + f" ๐ธ Snapshot: {snapshot_name}\n" + f" ๐ Status: Running in background\n" + f" ๐ฆ Indices: {', '.join(indices_list)}\n" + f" ๐ Global State: {'Included' if include_global_state else 'Excluded'}\n") if conflict_warning: result_message += conflict_warning result_message += (f"\nโ **Restore Complete**:\n" + f" ๐ Data has been {'restored' if wait_for_completion else 'restore started'}\n" + f" ๐ Use 'list_indices' to verify restored indices\n" + f" ๐ Check cluster health and index status\n" + f" ๐งช Test restored data integrity\n\n" + f"๐ **Post-Restore Checklist**:\n" + f" โ Verify all expected indices are present\n" + f" โ Check document counts match expectations\n" + f" โ Test search functionality on restored data\n" + f" โ Monitor cluster performance after restore") return result_message
- Pydantic-based input schema definition for the restore_snapshot tool parameters, providing validation and descriptions.snapshot_name: Annotated[str, Field(description="Name of the snapshot to restore from")], repository: Annotated[str, Field(description="Repository containing the snapshot")] = "backup_repository", indices: Annotated[Optional[str], Field( description="Comma-separated list of indices to restore (default: all from snapshot)")] = None, ignore_unavailable: Annotated[bool, Field(description="Whether to ignore unavailable indices")] = True, include_global_state: Annotated[bool, Field(description="Whether to restore cluster global state")] = False, wait_for_completion: Annotated[bool, Field(description="Whether to wait for restore completion")] = True, rename_pattern: Annotated[ Optional[str], Field(description="Pattern to rename restored indices (e.g., 'restored_%s')")] = None, index_settings: Annotated[Optional[str], Field(description="JSON string of index settings to override")] = None ) -> str:
- src/elasticsearch/elasticsearch_server.py:25-47 (registration)Mounts the elasticsearch_snapshots sub-server (containing restore_snapshot) into the main Elasticsearch FastMCP app, registering the tool globally.from .sub_servers.elasticsearch_snapshots import app as snapshots_app from .sub_servers.elasticsearch_index_metadata import app as index_metadata_app from .sub_servers.elasticsearch_document import app as document_app from .sub_servers.elasticsearch_index import app as index_app from .sub_servers.elasticsearch_search import app as search_app from .sub_servers.elasticsearch_batch import app as batch_app # Create unified FastMCP application app = FastMCP( name="AgentKnowledgeMCP-Elasticsearch", version="2.0.0", instructions="Unified Elasticsearch tools for comprehensive knowledge management via modular server mounting" ) # ================================ # SERVER MOUNTING - MODULAR ARCHITECTURE # ================================ print("๐๏ธ Mounting Elasticsearch sub-servers...") # Mount all sub-servers into unified interface app.mount(snapshots_app) # 3 tools: snapshot management app.mount(index_metadata_app) # 3 tools: metadata governance
- Local registration of the restore_snapshot tool on the sub-server FastMCP app via @app.tool decorator.# Repository doesn't exist, create default file system repository repo_body = { "type": "fs", "settings": { "location": f"/usr/share/elasticsearch/snapshots/{repository}", "compress": True } } try: es.snapshot.create_repository(repository=repository, body=repo_body) repo_created = True except Exception as repo_error: return (f"โ Failed to create snapshot repository:\n\n" + f"๐ง **Repository Error**: Cannot create repository '{repository}'\n" + f"๐ **Issue**: {str(repo_error)}\n\n" + f"๐ก **Common Solutions**:\n" + f" 1. Ensure Elasticsearch has write permissions to snapshot directory\n" + f" 2. Add 'path.repo: [\"/usr/share/elasticsearch/snapshots\"]' to elasticsearch.yml\n" + f" 3. Restart Elasticsearch after configuration change\n" + f" 4. Or use existing repository name\n\n" + f"๐ **Technical Details**: {str(repo_error)}") else: repo_created = False # Parse indices parameter if indices: indices_list = [idx.strip() for idx in indices.split(',')] indices_param = ','.join(indices_list) else: indices_param = "*" # All indices indices_list = ["*"] # Create snapshot metadata snapshot_body = { "indices": indices_param, "ignore_unavailable": ignore_unavailable, "include_global_state": include_global_state } if description: snapshot_body["metadata"] = { "description": description, "created_by": "AgentKnowledgeMCP", "created_at": datetime.now().isoformat() } # Create the snapshot snapshot_result = es.snapshot.create( repository=repository, snapshot=snapshot_name, body=snapshot_body, wait_for_completion=wait_for_completion ) # Format response based on completion if wait_for_completion: snapshot_info = snapshot_result.get('snapshot', {}) state = snapshot_info.get('state', 'UNKNOWN') if state == 'SUCCESS': status_emoji = "โ " status_msg = "Successfully completed" elif state == 'PARTIAL': status_emoji = "โ ๏ธ" status_msg = "Partially completed with some issues" elif state == 'FAILED': status_emoji = "โ" status_msg = "Failed to complete" else: status_emoji = "๐" status_msg = f"Status: {state}" result_message = (f"{status_emoji} Snapshot '{snapshot_name}' {status_msg}!\n\n" + f"๐ธ **Snapshot Details**:\n" + f" ๐ Repository: {repository}\n" + f" ๐ Name: {snapshot_name}\n" + f" ๐ State: {state}\n" + f" ๐ฆ Indices: {', '.join(indices_list)}\n" + f" ๐ Global State: {'Included' if include_global_state else 'Excluded'}\n") if snapshot_info.get('shards'): shards = snapshot_info['shards'] result_message += (f" ๐ข Shards: {shards.get('total', 0)} total, " + f"{shards.get('successful', 0)} successful, " + f"{shards.get('failed', 0)} failed\n") if snapshot_info.get('start_time_in_millis') and snapshot_info.get('end_time_in_millis'): duration = (snapshot_info['end_time_in_millis'] - snapshot_info['start_time_in_millis']) / 1000 result_message += f" โฑ๏ธ Duration: {duration:.2f} seconds\n" if description: result_message += f" ๐ Description: {description}\n" else: result_message = (f"๐ Snapshot '{snapshot_name}' started!\n\n" + f"๐ธ **Snapshot Details**:\n" + f" ๐ Repository: {repository}\n" + f" ๐ Name: {snapshot_name}\n" + f" ๐ Status: Running in background\n" + f" ๐ฆ Indices: {', '.join(indices_list)}\n" + f" ๐ Global State: {'Included' if include_global_state else 'Excluded'}\n") if repo_created: result_message += f"\n๐ **Repository Created**: Created new repository '{repository}'\n" result_message += (f"\nโ **Success Actions**:\n" + f" ๐ธ Snapshot backup is {'completed' if wait_for_completion else 'in progress'}\n" + f" ๐ Use 'list_snapshots' to view all snapshots\n" + f" ๐ Use 'restore_snapshot' to restore from this backup\n" + f" ๐ Check snapshot status with repository '{repository}'\n\n" + f"๐พ **Backup Strategy**:\n" + f" ๐ Regular snapshots help protect against data loss\n" + f" ๐ท๏ธ Use descriptive snapshot names with dates\n" + f" ๐ Monitor repository storage space\n" + f" ๐งน Clean up old snapshots periodically") return result_message except Exception as e: error_message = "โ Failed to create snapshot:\n\n" error_str = str(e).lower() if "connection" in error_str or "refused" in error_str: error_message += "๐ **Connection Error**: Cannot connect to Elasticsearch server\n" error_message += f"๐ Check if Elasticsearch is running at the configured address\n" error_message += f"๐ก Try: Use 'setup_elasticsearch' tool to start Elasticsearch\n\n" elif "repository" in error_str and ("not found" in error_str or "missing" in error_str): error_message += f"๐ **Repository Error**: Repository '{repository}' not found or misconfigured\n" error_message += f"๐ Check repository configuration and permissions\n" error_message += f"๐ก Try: Use different repository name or check path.repo settings\n\n" elif "invalid_snapshot_name" in error_str: error_message += f"๐ท๏ธ **Naming Error**: Invalid snapshot name '{snapshot_name}'\n" error_message += f"๐ Snapshot names must be lowercase and cannot contain certain characters\n" error_message += f"๐ก Try: Use alphanumeric characters and hyphens only\n\n" elif "already_exists" in error_str: error_message += f"๐ **Conflict Error**: Snapshot '{snapshot_name}' already exists\n" error_message += f"๐ Each snapshot must have a unique name within the repository\n" error_message += f"๐ก Try: Use different snapshot name or delete existing snapshot\n\n" else: error_message += f"โ ๏ธ **Unknown Error**: {str(e)}\n\n" error_message += f"๐ **Technical Details**: {str(e)}" return error_message # ================================ # TOOL 15: RESTORE_SNAPSHOT # ================================ @app.tool(