Skip to main content
Glama
osd.py9.1 kB
"""OSD-related API endpoints.""" from collections import defaultdict from ceph_mcp.api.base import BaseCephClient, CephAPIError from ceph_mcp.models.osd import ( OSD, DeviceClassSummary, Host, OSDIdInfo, OSDStats, OSDSummary, PerfStat, Tree, ) class OSDClient(BaseCephClient): # pylint: disable=too-few-public-methods """Client for Ceph OSD-related operations.""" async def get_osd_summary(self) -> OSDSummary: """Retrieve summary information about all OSDs in the cluster.""" try: response_data = await self._make_request( "/api/osd?limit=-1", accept_header="application/vnd.ceph.api.v1.1+json", ) # Response should be a list of OSD objects osds_data = response_data if isinstance(response_data, list) else [] osds = [] for osd_data in osds_data: osds.append(self._parse_osd_data(osd_data)) # Calculate summary statistics total_osds = len(osds) up_osds = len([osd for osd in osds if osd.is_up()]) down_osds = total_osds - up_osds in_osds = len([osd for osd in osds if osd.is_in()]) out_osds = total_osds - in_osds working_osds = len([osd for osd in osds if osd.is_working()]) # Get unique hosts unique_hosts = list({osd.get_hostname() for osd in osds}) # Get unique device classes device_classes = list({osd.get_device_class() for osd in osds}) # Group by device class for summary device_class_summary = {} device_groups = defaultdict(list) for osd in osds: device_groups[osd.get_device_class()].append(osd) for device_class, class_osds in device_groups.items(): total_pgs = sum(osd.osd_stats.num_pgs for osd in class_osds) total_capacity = sum(osd.osd_stats.kb for osd in class_osds) total_used = sum(osd.osd_stats.kb_used for osd in class_osds) total_available = sum(osd.osd_stats.kb_avail for osd in class_osds) device_class_summary[device_class] = DeviceClassSummary( device_class=device_class, osd_count=len(class_osds), total_pgs=total_pgs, total_capacity_kb=total_capacity, total_used_kb=total_used, total_available_kb=total_available, ) return OSDSummary( total_osds=total_osds, up_osds=up_osds, down_osds=down_osds, in_osds=in_osds, out_osds=out_osds, working_osds=working_osds, unique_hosts=unique_hosts, device_classes=device_classes, device_class_summary=device_class_summary, osds=osds, ) except Exception as e: self.logger.error("Failed to retrieve OSD summary", error=str(e)) raise CephAPIError(f"Failed to get OSD summary: {str(e)}") from e async def get_osd_ids(self) -> OSDIdInfo: """Retrieve list of all OSD IDs and their hosts.""" try: # Get all OSDs first osd_summary = await self.get_osd_summary() # Extract OSD ID and host information osd_ids = [ {"osd_id": osd.osd, "hostname": osd.get_hostname()} for osd in osd_summary.osds ] return OSDIdInfo(osd_ids=osd_ids, total_count=len(osd_ids)) except Exception as e: self.logger.error("Failed to retrieve OSD IDs", error=str(e)) raise CephAPIError(f"Failed to get OSD IDs: {str(e)}") from e async def get_osd_details(self, osd_id: int) -> OSD: """Retrieve detailed information about a specific OSD.""" try: # Get all OSDs first (since there's no single OSD endpoint) osd_summary = await self.get_osd_summary() # Find the specific OSD osd = osd_summary.get_osd_by_id(osd_id) if not osd: available_ids = [str(osd.osd) for osd in osd_summary.osds] raise CephAPIError( f"OSD {osd_id} not found. Available OSD IDs: {', '.join(available_ids)}" ) return osd except CephAPIError: # Re-raise CephAPIError as-is raise except Exception as e: self.logger.error( "Failed to retrieve OSD details", osd_id=osd_id, error=str(e) ) raise CephAPIError( f"Failed to get OSD details for OSD {osd_id}: {str(e)}" ) from e async def perform_osd_mark_action(self, osd_id: int, action: str) -> dict: """Perform a mark action on a specific OSD.""" try: # Validate OSD ID if not isinstance(osd_id, int) or osd_id < 0: raise CephAPIError( f"OSD ID must be a non-negative integer, got: {osd_id}" ) # Validate action valid_actions = ["noout", "out", "in"] if action not in valid_actions: raise CephAPIError( f"Invalid action '{action}'. Valid actions: {', '.join(valid_actions)}" ) # Prepare request payload action_payload = {"action": action} response_data = await self._make_request( f"/api/osd/{osd_id}/mark", accept_header="application/vnd.ceph.api.v1.0+json", method="PUT", json_data=action_payload, ) return { "osd_id": osd_id, "action": action, "success": True, "response": response_data, } except CephAPIError: # Re-raise CephAPIError as-is raise except Exception as e: self.logger.error( "Failed to perform OSD mark action", osd_id=osd_id, action=action, error=str(e), ) raise CephAPIError( f"Failed to mark OSD {osd_id} as {action}: {str(e)}" ) from e def _parse_osd_data(self, osd_data: dict) -> OSD: """Convert raw OSD data to OSD model.""" try: # Parse nested structures osd_stats_data = osd_data.get("osd_stats", {}) tree_data = osd_data.get("tree", {}) host_data = osd_data.get("host", {}) # Parse performance statistics perf_stat_data = osd_stats_data.get("perf_stat", {}) perf_stat = PerfStat( commit_latency_ms=perf_stat_data.get("commit_latency_ms", 0.0), apply_latency_ms=perf_stat_data.get("apply_latency_ms", 0.0), ) # Parse OSD statistics osd_stats = OSDStats( osd=osd_stats_data.get("osd", 0), num_pgs=osd_stats_data.get("num_pgs", 0), num_osds=osd_stats_data.get("num_osds", 1), kb=osd_stats_data.get("kb", 0), kb_used=osd_stats_data.get("kb_used", 0), kb_avail=osd_stats_data.get("kb_avail", 0), perf_stat=perf_stat, alerts=osd_stats_data.get("alerts", []), ) # Parse tree information tree = Tree( id=tree_data.get("id", 0), device_class=tree_data.get("device_class", ""), type=tree_data.get("type", "osd"), ) # Parse host information host = Host(name=host_data.get("name", "unknown")) return OSD( osd=osd_data.get("osd", 0), id=osd_data.get("id", 0), up=osd_data.get("up", 0), **{"in": osd_data.get("in", 0)}, weight=osd_data.get("weight", 1.0), operational_status=osd_data.get("operational_status", ""), osd_stats=osd_stats, tree=tree, host=host, ) except Exception as e: # pylint: disable=broad-except self.logger.error( "Failed to parse OSD data", osd_data=osd_data, error=str(e) ) # Return a minimal OSD object with whatever we can extract return OSD( osd=osd_data.get("osd", 0), id=osd_data.get("id", 0), up=osd_data.get("up", 0), **{"in": osd_data.get("in", 0)}, weight=osd_data.get("weight", 1.0), operational_status=osd_data.get("operational_status", ""), osd_stats=OSDStats(osd=osd_data.get("osd", 0)), tree=Tree(id=osd_data.get("id", 0)), host=Host(name="unknown"), )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rajmohanram/ceph-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server