Skip to main content
Glama
seohyunjun

OpenSearch MCP Server

by seohyunjun

get_recovery_status

Check recovery status and estimated completion time for OpenSearch cluster operations to monitor progress and plan next steps.

Instructions

Get recovery status and estimated completion time

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The main handler function for the 'get_recovery_status' tool. It fetches active shard recoveries from OpenSearch using cat.recovery API, calculates progress (files and bytes percentage), recovery rate (MB/sec), and estimates remaining time. If no active recoveries, falls back to cluster health summary. Returns formatted text output as list[TextContent].
    async def get_recovery_status() -> list[TextContent]:
        """
        Get recovery status for shards that are currently being recovered.
        Includes progress percentage and estimated time remaining based on current recovery rate.
        """
        self.logger.info("Fetching recovery status...")
        try:
            # Get active recoveries with detailed stats
            response = self.es_client.cat.recovery(format='json', active_only=True, v=True)
            
            if not response:
                # Get cluster health to show overall shard status if no active recoveries
                health = self.es_client.cluster.health()
                total_shards = health['active_shards'] + health['unassigned_shards'] + health['initializing_shards']
                active_pct = (health['active_shards'] / total_shards) * 100 if total_shards > 0 else 100
                
                status_msg = (
                    f"No active recoveries. Cluster status: {health['status']}\n"
                    f"Active shards: {health['active_shards']}/{total_shards} ({active_pct:.1f}%)\n"
                    f"Initializing: {health['initializing_shards']}\n"
                    f"Unassigned: {health['unassigned_shards']}"
                )
                return [TextContent(type="text", text=status_msg)]
    
            # Process active recoveries
            summary = []
            for recovery in response:
                index = recovery['index']
                shard = recovery['shard']
                stage = recovery.get('stage', 'unknown')
                
                # Calculate progress and time remaining
                files_pct = float(recovery.get('files_percent', '0').rstrip('%'))
                bytes_pct = float(recovery.get('bytes_percent', '0').rstrip('%'))
                total_bytes = int(recovery.get('total_bytes', 0))
                bytes_recovered = int(recovery.get('recovered_in_bytes', 0))
                
                # Parse time value which can be in format like "1.2s" or "3m" or "2.5h"
                time_str = recovery.get('time', '0s')
                try:
                    # Convert time string to milliseconds
                    if time_str.endswith('ms'):
                        time_spent_ms = float(time_str[:-2])
                    elif time_str.endswith('s'):
                        time_spent_ms = float(time_str[:-1]) * 1000
                    elif time_str.endswith('m'):
                        time_spent_ms = float(time_str[:-1]) * 60 * 1000
                    elif time_str.endswith('h'):
                        time_spent_ms = float(time_str[:-1]) * 60 * 60 * 1000
                    else:
                        time_spent_ms = 0
                except ValueError:
                    time_spent_ms = 0
                
                # Calculate recovery rate and estimated time remaining
                if bytes_recovered > 0 and time_spent_ms > 0:
                    rate_mb_sec = (bytes_recovered / 1024 / 1024) / (time_spent_ms / 1000)
                    remaining_bytes = total_bytes - bytes_recovered
                    est_seconds_remaining = (remaining_bytes / 1024 / 1024) / rate_mb_sec if rate_mb_sec > 0 else 0
                    
                    # Format time remaining in a human-readable way
                    if est_seconds_remaining < 60:
                        time_remaining = f"{est_seconds_remaining:.0f} seconds"
                    elif est_seconds_remaining < 3600:
                        time_remaining = f"{est_seconds_remaining/60:.1f} minutes"
                    else:
                        time_remaining = f"{est_seconds_remaining/3600:.1f} hours"
                    
                    recovery_info = (
                        f"Index: {index}, Shard: {shard}\n"
                        f"Stage: {stage}\n"
                        f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n"
                        f"Rate: {rate_mb_sec:.1f} MB/sec\n"
                        f"Est. time remaining: {time_remaining}\n"
                    )
                else:
                    recovery_info = (
                        f"Index: {index}, Shard: {shard}\n"
                        f"Stage: {stage}\n"
                        f"Progress: files={files_pct:.1f}%, bytes={bytes_pct:.1f}%\n"
                        "Rate: calculating...\n"
                    )
                
                summary.append(recovery_info)
            
            return [TextContent(type="text", text="\n".join(summary))]
            
        except Exception as e:
            self.logger.error(f"Error fetching recovery status: {e}")
            return [TextContent(type="text", text=f"Error: {str(e)}")]
  • Calls register_tools on AdminClusterTools instance, which registers the get_recovery_status tool (along with others) via its @mcp.tool decorators.
    admin_cluster_tools.register_tools(self.mcp)
  • Instantiates the AdminClusterTools class containing the get_recovery_status tool implementation.
    admin_cluster_tools = AdminClusterTools(self.logger)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seohyunjun/opensearch-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server