get_recovery_status
Check recovery status and estimated completion time for OpenSearch cluster operations to monitor progress and plan next steps.
Instructions
Get recovery status and estimated completion time
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- The main handler function for the 'get_recovery_status' tool. It fetches active shard recoveries from OpenSearch using cat.recovery API, calculates progress (files and bytes percentage), recovery rate (MB/sec), and estimates remaining time. If no active recoveries, falls back to cluster health summary. Returns formatted text output as list[TextContent].async def get_recovery_status() -> list[TextContent]: """ Get recovery status for shards that are currently being recovered. Includes progress percentage and estimated time remaining based on current recovery rate. """ self.logger.info("Fetching recovery status...") try: # Get active recoveries with detailed stats response = self.es_client.cat.recovery(format='json', active_only=True, v=True) if not response: # Get cluster health to show overall shard status if no active recoveries health = self.es_client.cluster.health() total_shards = health['active_shards'] + health['unassigned_shards'] + health['initializing_shards'] active_pct = (health['active_shards'] / total_shards) * 100 if total_shards > 0 else 100 status_msg = ( f"No active recoveries. Cluster status: {health['status']}\n" f"Active shards: {health['active_shards']}/{total_shards} ({active_pct:.1f}%)\n" f"Initializing: {health['initializing_shards']}\n" f"Unassigned: {health['unassigned_shards']}" ) return [TextContent(type="text", text=status_msg)] # Process active recoveries summary = [] for recovery in response: index = recovery['index'] shard = recovery['shard'] stage = recovery.get('stage', 'unknown') # Calculate progress and time remaining files_pct = float(recovery.get('files_percent', '0').rstrip('%')) bytes_pct = float(recovery.get('bytes_percent', '0').rstrip('%')) total_bytes = int(recovery.get('total_bytes', 0)) bytes_recovered = int(recovery.get('recovered_in_bytes', 0)) # Parse time value which can be in format like "1.2s" or "3m" or "2.5h" time_str = recovery.get('time', '0s') try: # Convert time string to milliseconds if time_str.endswith('ms'): time_spent_ms = float(time_str[:-2]) elif time_str.endswith('s'): time_spent_ms = float(time_str[:-1]) * 1000 elif time_str.endswith('m'): time_spent_ms = float(time_str[:-1]) * 60 * 1000 elif time_str.endswith('h'): time_spent_ms = float(time_str[:-1]) * 60 * 60 * 1000 else: time_spent_ms = 0 except ValueError: time_spent_ms = 0 # Calculate recovery rate and estimated time remaining if bytes_recovered > 0 and time_spent_ms > 0: rate_mb_sec = (bytes_recovered / 1024 / 1024) / (time_spent_ms / 1000) remaining_bytes = total_bytes - bytes_recovered est_seconds_remaining = (remaining_bytes / 1024 / 1024) / rate_mb_sec if rate_mb_sec > 0 else 0 # Format time remaining in a human-readable way if est_seconds_remaining < 60: time_remaining = f"{est_seconds_remaining:.0f} seconds" elif est_seconds_remaining < 3600: time_remaining = f"{est_seconds_remaining/60:.1f} minutes" else: time_remaining = f"{est_seconds_remaining/3600:.1f} hours" recovery_info = ( f"Index: {index}, Shard: {shard}\n" f"Stage: {stage}\n" f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n" f"Rate: {rate_mb_sec:.1f} MB/sec\n" f"Est. time remaining: {time_remaining}\n" ) else: recovery_info = ( f"Index: {index}, Shard: {shard}\n" f"Stage: {stage}\n" f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n" "Rate: calculating...\n" ) summary.append(recovery_info) return [TextContent(type="text", text="\n".join(summary))] except Exception as e: self.logger.error(f"Error fetching recovery status: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")]
- src/opensearch_mcp_server/server.py:41-41 (registration)Calls register_tools on AdminClusterTools instance, which registers the get_recovery_status tool (along with others) via its @mcp.tool decorators.admin_cluster_tools.register_tools(self.mcp)
- src/opensearch_mcp_server/server.py:33-33 (registration)Instantiates the AdminClusterTools class containing the get_recovery_status tool implementation.admin_cluster_tools = AdminClusterTools(self.logger)