Skip to main content
Glama
pool.py6.49 kB
"""Pool-related API endpoints.""" from collections import defaultdict from ceph_mcp.api.base import BaseCephClient, CephAPIError from ceph_mcp.models.pool import ( Pool, PoolOptions, PoolStateSummary, PoolSummary, PoolTypeSummary, ) class PoolClient(BaseCephClient): # pylint: disable=too-few-public-methods """Client for Ceph pool-related operations.""" async def get_pool_summary(self) -> PoolSummary: """Retrieve summary information about all pools in the cluster.""" try: response_data = await self._make_request( "/api/pool?stats=true", accept_header="application/vnd.ceph.api.v1.0+json", ) # Response should be a list of pool objects pools_data = response_data if isinstance(response_data, list) else [] pools = [] for pool_data in pools_data: pools.append(self._parse_pool_data(pool_data)) # Calculate summary statistics total_pools = len(pools) replicated_pools = len([pool for pool in pools if pool.is_replicated()]) erasure_pools = len([pool for pool in pools if pool.is_erasure()]) # Group by pool type pool_types = {} type_groups = defaultdict(list) for pool in pools: type_groups[pool.type].append(pool) for pool_type, type_pools in type_groups.items(): pool_types[pool_type] = PoolTypeSummary( pool_type=pool_type, count=len(type_pools), pool_names=[p.pool_name for p in type_pools], ) # Aggregate PG status across all pools total_pgs = sum(pool.get_total_pgs() for pool in pools) pg_state_aggregation = defaultdict( lambda: {"pool_count": 0, "total_pgs": 0} ) for pool in pools: pool_states = pool.get_pg_states() for state in pool_states: pg_state_aggregation[state]["pool_count"] += 1 pg_state_aggregation[state]["total_pgs"] += pool.pg_status.get( state, 0 ) # Convert to PoolStateSummary objects pg_states = {} for state, data in pg_state_aggregation.items(): pg_states[state] = PoolStateSummary( state=state, pool_count=data["pool_count"], total_pgs=data["total_pgs"], ) # Calculate health statistics healthy_pools = len([pool for pool in pools if pool.is_healthy()]) unhealthy_pools = total_pools - healthy_pools return PoolSummary( total_pools=total_pools, replicated_pools=replicated_pools, erasure_pools=erasure_pools, pool_types=pool_types, total_pgs=total_pgs, pg_states=pg_states, healthy_pools=healthy_pools, unhealthy_pools=unhealthy_pools, pools=pools, ) except Exception as e: self.logger.error("Failed to retrieve pool summary", error=str(e)) raise CephAPIError(f"Failed to get pool summary: {str(e)}") from e async def get_pool_details(self, pool_name: str) -> Pool: """Retrieve detailed information about a specific pool.""" try: # Get all pools first (since there's no single pool endpoint) pool_summary = await self.get_pool_summary() # Find the specific pool pool = pool_summary.get_pool_by_name(pool_name) if not pool: available_pools = pool_summary.get_pool_names() raise CephAPIError( f"Pool '{pool_name}' not found. Available pools: {', '.join(available_pools)}" ) return pool except CephAPIError: # Re-raise CephAPIError as-is raise except Exception as e: self.logger.error( "Failed to retrieve pool details", pool_name=pool_name, error=str(e) ) raise CephAPIError( f"Failed to get pool details for '{pool_name}': {str(e)}" ) from e def _parse_pool_data(self, pool_data: dict) -> Pool: """Convert raw pool data to Pool model.""" try: # Parse pool options options_data = pool_data.get("options", {}) options = PoolOptions( pg_num_max=options_data.get("pg_num_max", 32), pg_num_min=options_data.get("pg_num_min", 1), ) return Pool( pool_name=pool_data.get("pool_name", "unknown"), type=pool_data.get("type", "unknown"), size=pool_data.get("size", 0), min_size=pool_data.get("min_size", 0), crush_rule=pool_data.get("crush_rule", ""), pg_num=pool_data.get("pg_num", 0), pg_placement_num=pool_data.get("pg_placement_num", 0), pg_placement_num_target=pool_data.get("pg_placement_num_target", 0), pg_num_target=pool_data.get("pg_num_target", 0), options=options, application_metadata=pool_data.get("application_metadata", []), pg_status=pool_data.get("pg_status", {}), ) except Exception as e: self.logger.error( "Failed to parse pool data", pool_data=pool_data, error=str(e) ) # Return a minimal pool object with whatever we can extract return Pool( pool_name=pool_data.get("pool_name", "unknown"), type=pool_data.get("type", "unknown"), size=pool_data.get("size", 0), min_size=pool_data.get("min_size", 0), crush_rule=pool_data.get("crush_rule", ""), pg_num=pool_data.get("pg_num", 0), pg_placement_num=pool_data.get("pg_placement_num", 0), pg_placement_num_target=pool_data.get("pg_placement_num_target", 0), pg_num_target=pool_data.get("pg_num_target", 0), options=PoolOptions(), application_metadata=[], pg_status={}, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rajmohanram/ceph-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server