Skip to main content
Glama
skeleton_service.py15.5 kB
""" Skeleton generation service. Handles orchestration of skeleton building, relationship gathering, and metrics calculation independent of MCP layer. """ from typing import Dict, Any, Optional, List from dataclasses import dataclass from pathlib import Path import logging from nabu.services.base import BaseService from nabu.core.skeleton_builder import SkeletonBuilder, SkeletonOptions from nabu.language_handlers.formatters import formatter_registry logger = logging.getLogger(__name__) @dataclass class SkeletonRequest: """Request parameters for skeleton generation.""" target_frame_data: Dict[str, Any] detail_level: str = "minimal" structure_detail_depth: int = 1 include_docstrings: bool = False include_private: bool = True max_recursion_depth: int = 1 include_relationships: bool = False include_metrics: bool = False max_callers: int = 10 @dataclass class SkeletonResult: """Domain result from skeleton generation.""" skeleton: str metadata: Dict[str, Any] relationships: Optional[Dict[str, Any]] = None metrics: Optional[Dict[str, Any]] = None class SkeletonService(BaseService): """ Service for generating frame skeletons. Handles orchestration of skeleton building, relationship gathering, and metrics calculation independent of MCP layer. """ def __init__(self, db_manager): super().__init__(db_manager) self._builder = None @property def builder(self): """Lazy-initialize skeleton builder.""" if self._builder is None: from nabu.core.skeleton_builder import SkeletonBuilder self._builder = SkeletonBuilder(self.db_manager) return self._builder async def generate_skeleton(self, request: SkeletonRequest) -> SkeletonResult: """ Generate skeleton for a frame with optional relationships and metrics. Args: request: SkeletonRequest with all parameters Returns: SkeletonResult with skeleton string and metadata Raises: ValueError: If frame data invalid or formatter not found """ frame_data = request.target_frame_data frame_type = frame_data.get("type") language = frame_data.get("language") # Validate language support if not language: raise ValueError(f"Frame has no language information") formatter = formatter_registry.get_formatter(language) if formatter is None: supported = formatter_registry.get_supported_languages() raise ValueError( f"Language '{language}' not supported for skeleton generation. " f"Supported: {', '.join(supported)}" ) # Build skeleton options options = SkeletonOptions( detail_level=request.detail_level, include_docstrings=request.include_docstrings, structure_detail_depth=request.structure_detail_depth ) # Generate skeleton skeleton_result = await self.builder.build_recursive_skeleton( frame_data=frame_data, options=options, current_depth=0, max_recursion_depth=request.max_recursion_depth, include_private=request.include_private ) # Build metadata skeleton = skeleton_result["skeleton"] estimated_tokens = len(skeleton) // 4 # Rough estimate children_count = len(skeleton_result.get("children", [])) metadata = { "frame_type": frame_type, "name": frame_data["name"], "qualified_name": frame_data["qualified_name"], "file_path": frame_data["file_path"], "location": frame_data["location"], "language": language, "detail_level": request.detail_level, "recursion_depth": request.max_recursion_depth, "children_count": children_count, "estimated_tokens": estimated_tokens } # Gather relationships if requested (CLASS frames only) relationships = None if request.include_relationships and frame_type == "CLASS": relationships = await self._gather_relationships( frame_data, request.max_callers ) # Calculate metrics if requested (CLASS frames only) metrics = None if request.include_metrics and frame_type == "CLASS": metrics = await self._calculate_metrics( frame_data, relationships ) return SkeletonResult( skeleton=skeleton, metadata=metadata, relationships=relationships, metrics=metrics ) async def generate_multi_skeleton( self, frame_data_list: List[Dict[str, Any]], detail_level: str = "minimal", structure_detail_depth: int = 1, include_docstrings: bool = False, include_private: bool = True, max_recursion_depth: int = 1 ) -> List[Dict[str, Any]]: """ Generate skeletons for multiple frames. Args: frame_data_list: List of frame data dicts detail_level: Skeleton detail level structure_detail_depth: Control flow nesting depth include_docstrings: Whether to include docstrings include_private: Whether to include private members max_recursion_depth: Maximum recursion depth Returns: List of skeleton dicts with metadata """ all_skeletons = [] for frame_data in frame_data_list: frame_type = frame_data.get("type") language = frame_data.get("language") if not language: logger.warning( f"Skipping frame {frame_data.get('qualified_name')} " f"- no language information" ) continue formatter = formatter_registry.get_formatter(language) if not formatter: logger.warning( f"Skipping frame {frame_data.get('qualified_name')} " f"- language '{language}' not supported" ) continue # Build skeleton for this frame options = SkeletonOptions( detail_level=detail_level, include_docstrings=include_docstrings, structure_detail_depth=structure_detail_depth ) skeleton_result = await self.builder.build_recursive_skeleton( frame_data=frame_data, options=options, current_depth=0, max_recursion_depth=max_recursion_depth, include_private=include_private ) all_skeletons.append({ "name": frame_data["name"], "qualified_name": frame_data["qualified_name"], "type": frame_type, "location": frame_data["location"], "language": language, "skeleton": skeleton_result["skeleton"] }) return all_skeletons async def _gather_relationships( self, frame_data: Dict[str, Any], max_callers: int ) -> Dict[str, Any]: """ Gather inheritance, callers, dependencies for a CLASS frame. Args: frame_data: Frame data dict with 'id' field max_callers: Maximum number of callers to return Returns: Dict with inheritance, called_by, dependencies """ class_id = frame_data['id'] relationships = {} # Get inheritance relationships inheritance_query = """ MATCH (c:Frame {id: $class_id}) OPTIONAL MATCH (c)-[:Edge {type: 'INHERITS'}]->(parent:Frame) OPTIONAL MATCH (child:Frame)-[:Edge {type: 'INHERITS'}]->(c) RETURN collect(DISTINCT { name: parent.name, qualified_name: parent.qualified_name, file_path: parent.file_path }) as parents, collect(DISTINCT { name: child.name, qualified_name: child.qualified_name, file_path: child.file_path, start_line: child.start_line, end_line: child.end_line }) as children """ inheritance_result = self.db_manager.execute( inheritance_query, {"class_id": class_id} ) inheritance_df = inheritance_result.get_as_df() inheritance_data = {"parents": [], "children": []} if not inheritance_df.empty: parents = inheritance_df.iloc[0]['parents'] children = inheritance_df.iloc[0]['children'] for p in parents: if p['name']: # Filter out nulls inheritance_data["parents"].append({ "name": p['name'], "qualified_name": p['qualified_name'], "file_path": p['file_path'] }) for c in children: if c['name']: # Filter out nulls inheritance_data["children"].append({ "name": c['name'], "qualified_name": c['qualified_name'], "location": f"{Path(c['file_path']).name}:{c['start_line']}-{c['end_line']}" }) relationships["inheritance"] = inheritance_data # Get callers callers_query = """ MATCH (c:Frame {id: $class_id})-[:Edge {type: 'CONTAINS'}]->(m:Frame {type: 'CALLABLE'})<-[e:Edge {type: 'CALLS'}]-(caller:Frame) RETURN caller.name, caller.qualified_name, caller.file_path, caller.start_line, e.confidence, m.name as called_method ORDER BY e.confidence DESC LIMIT $max_callers """ callers_result = self.db_manager.execute(callers_query, { "class_id": class_id, "max_callers": max_callers }) callers_df = callers_result.get_as_df() callers = [] for _, row in callers_df.iterrows(): callers.append({ "caller_name": row['caller.name'], "caller_qualified_name": row['caller.qualified_name'], "location": f"{Path(row['caller.file_path']).name}:{row['caller.start_line']}", "confidence": float(row['e.confidence']), "called_method": row['called_method'] }) relationships["called_by"] = callers # Get dependencies deps_query = """ MATCH (c:Frame {id: $class_id})-[e:Edge]->(dep:Frame) WHERE e.type IN ['CALLS', 'USES', 'IMPORTS'] RETURN dep.name, dep.qualified_name, dep.type, dep.file_path, e.type as edge_type LIMIT 20 """ deps_result = self.db_manager.execute(deps_query, {"class_id": class_id}) deps_df = deps_result.get_as_df() dependencies = [] for _, row in deps_df.iterrows(): dependencies.append({ "name": row['dep.name'], "qualified_name": row['dep.qualified_name'], "type": row['dep.type'], "relationship": row['edge_type'], "file_path": row['dep.file_path'] }) relationships["dependencies"] = dependencies return relationships async def _calculate_metrics( self, frame_data: Dict[str, Any], relationships: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Calculate complexity metrics for a CLASS frame. Args: frame_data: Frame data dict with 'id' and field info relationships: Optional relationships data (if already gathered) Returns: Dict with method counts, field counts, complexity rating """ class_id = frame_data['id'] language = frame_data.get('language', 'python') # Get full frame details from database frame_query = """ MATCH (f:Frame {id: $frame_id}) RETURN f.name, f.qualified_name, f.file_path, f.start_line, f.end_line, f.instance_fields, f.static_fields """ frame_result = self.db_manager.execute(frame_query, {"frame_id": class_id}) frame_df = frame_result.get_as_df() if frame_df.empty: # Fallback to basic info from frame_data instance_fields = [] static_fields = [] else: row = frame_df.iloc[0] instance_fields = row.get('f.instance_fields', []) or [] static_fields = row.get('f.static_fields', []) or [] # Get method count methods_query = """ MATCH (c:Frame {id: $class_id})-[:Edge {type: 'CONTAINS'}]->(m:Frame {type: 'CALLABLE'}) RETURN count(m) as method_count """ methods_result = self.db_manager.execute(methods_query, {"class_id": class_id}) methods_df = methods_result.get_as_df() method_count = int(methods_df.iloc[0]['method_count']) if not methods_df.empty else 0 # Get caller and dependency counts (from relationships if available) if relationships: caller_count = len(relationships.get("called_by", [])) dep_count = len(relationships.get("dependencies", [])) parent_count = len(relationships.get("inheritance", {}).get("parents", [])) child_count = len(relationships.get("inheritance", {}).get("children", [])) else: caller_count = 0 dep_count = 0 parent_count = 0 child_count = 0 # Calculate complexity rating complexity_rating = self._calculate_complexity_rating( method_count, caller_count, dep_count ) return { "method_count": method_count, "instance_field_count": len(instance_fields), "static_field_count": len(static_fields), "parent_count": parent_count, "child_count": child_count, "caller_count": caller_count, "dependency_count": dep_count, "complexity_rating": complexity_rating } def _calculate_complexity_rating( self, method_count: int, caller_count: int, dep_count: int ) -> str: """ Calculate complexity rating based on metrics. Args: method_count: Number of methods in class caller_count: Number of callers dep_count: Number of dependencies Returns: Complexity rating string (LOW, MEDIUM, HIGH, VERY HIGH) """ # Weighted complexity score complexity_score = ( method_count * 0.4 + # Methods contribute most caller_count * 0.3 + # Callers indicate coupling dep_count * 0.3 # Dependencies indicate complexity ) if complexity_score > 50: return "VERY HIGH" elif complexity_score > 25: return "HIGH" elif complexity_score > 10: return "MEDIUM" else: return "LOW"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server