Skip to main content
Glama
johannhartmann

MCP Code Analysis Server

package_analyzer.py17.5 kB
"""Package structure analyzer for Python projects.""" from collections import defaultdict from pathlib import Path from typing import Any, cast from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from src.database.models import File, Import, Module, Repository from src.database.package_models import ( Package, PackageDependency, PackageMetrics, PackageModule, ) from src.logger import get_logger logger = get_logger(__name__) class PackageAnalyzer: """Analyzes package structure and dependencies within a repository.""" def __init__(self, db_session: AsyncSession, repository_id: int): self.db_session = db_session self.repository_id = repository_id self._package_cache: dict[str, Package] = {} self._file_cache: dict[str, File] = {} async def analyze_packages(self) -> dict[str, Any]: """Analyze the complete package structure of the repository.""" logger.info( "Starting package structure analysis for repository %d", self.repository_id ) # Get repository info repo = await self.db_session.get(Repository, self.repository_id) if not repo: msg = f"Repository {self.repository_id} not found" raise ValueError(msg) # Find all Python files result = await self.db_session.execute( select(File).where( File.repository_id == self.repository_id, File.language == "python", ~File.is_deleted, ) ) python_files = list(result.scalars().all()) # Build file cache self._file_cache = {cast("str", f.path): f for f in python_files} # Find all packages (directories with __init__.py) packages = await self._discover_packages(python_files) # Create package hierarchy await self._create_package_hierarchy(packages) # Analyze package contents await self._analyze_package_contents() # Analyze dependencies between packages await self._analyze_package_dependencies() # Calculate package metrics await self._calculate_package_metrics() # Commit all changes await self.db_session.commit() return { "packages_found": len(self._package_cache), "total_files": len(python_files), "package_tree": await self._build_package_tree(), } async def _discover_packages( # noqa: PLR0912 self, python_files: list[File] ) -> dict[str, dict[str, Any]]: """Discover all packages in the repository.""" packages: dict[str, dict[str, Any]] = {} # Find all __init__.py files init_files = [f for f in python_files if f.path.endswith("__init__.py")] for init_file in init_files: package_path = str(Path(init_file.path).parent) package_name = Path(package_path).name # Root package might be "." or empty if package_path == ".": package_path = "" package_name = "root" packages[package_path] = { "name": package_name, "path": package_path, "init_file": init_file, "modules": [], "subpackages": [], } # Find namespace packages (PEP 420) - directories with .py files but no __init__.py all_dirs = set() for file in python_files: path = Path(file.path) for parent in path.parents: if str(parent) != ".": all_dirs.add(str(parent)) # Check each directory for dir_path in all_dirs: if dir_path not in packages: # Check if it contains Python files has_py_files = any( f.path.startswith(dir_path + "/") and f.path.count("/", len(dir_path) + 1) == 0 for f in python_files ) if has_py_files: package_name = Path(dir_path).name packages[dir_path] = { "name": package_name, "path": dir_path, "init_file": None, "is_namespace": True, "modules": [], "subpackages": [], } # Find modules in each package for file in python_files: if file.path.endswith("__init__.py"): continue file_path = Path(file.path) parent_path = str(file_path.parent) if parent_path in packages: module_name = file_path.stem modules_list = cast( "list[dict[str, Any]]", packages[parent_path]["modules"] ) modules_list.append({"name": module_name, "file": file}) # Build parent-child relationships for pkg_path in sorted(packages.keys(), key=len, reverse=True): if pkg_path: parent_path = str(Path(pkg_path).parent) if parent_path in packages and parent_path != pkg_path: subpackages_list = cast( "list[str]", packages[parent_path]["subpackages"] ) subpackages_list.append(pkg_path) return packages async def _create_package_hierarchy( self, packages: dict[str, dict[str, Any]] ) -> None: """Create package records and establish hierarchy.""" # Create packages in order (parents first) for pkg_path in sorted(packages.keys(), key=len): pkg_info = packages[pkg_path] # Find parent package parent_id = None if pkg_path: parent_path = str(Path(pkg_path).parent) # Handle "." as root package path if parent_path == ".": parent_path = "" if parent_path in self._package_cache: parent_id = self._package_cache[parent_path].id # Create package record package = Package( repository_id=self.repository_id, path=pkg_path, name=pkg_info["name"], parent_id=parent_id, init_file_id=( pkg_info["init_file"].id if pkg_info.get("init_file") else None ), has_init=bool(pkg_info.get("init_file")), is_namespace=pkg_info.get("is_namespace", False), ) # Extract docstring from __init__.py if available if pkg_info.get("init_file"): module_result = await self.db_session.execute( select(Module).where(Module.file_id == pkg_info["init_file"].id) ) module = module_result.scalar_one_or_none() if module: package.docstring = module.docstring self.db_session.add(package) await self.db_session.flush() # Get ID self._package_cache[pkg_path] = package # Check for README readme_names = ["README.md", "README.rst", "README.txt", "readme.md"] for readme_name in readme_names: readme_path = ( str(Path(pkg_path) / readme_name) if pkg_path else readme_name ) if readme_path in self._file_cache: package.readme_file_id = self._file_cache[readme_path].id break async def _analyze_package_contents(self) -> None: """Analyze the contents of each package.""" for pkg_path, package in self._package_cache.items(): # Count direct modules module_count = 0 for file_path, file in self._file_cache.items(): if file_path.endswith("__init__.py"): continue file_parent = str(Path(file_path).parent) if file_parent == pkg_path: module_count += 1 # Create PackageModule association module_name = Path(file_path).stem pkg_module = PackageModule( package_id=package.id, file_id=file.id, module_name=module_name, is_public=not module_name.startswith("_"), ) # Check for __all__ exports module_result = await self.db_session.execute( select(Module).where(Module.file_id == file.id) ) module = module_result.scalar_one_or_none() if module: # TODO(dev): Extract __all__ from module (would need AST analysis) pass self.db_session.add(pkg_module) # Count subpackages subpackage_count = sum( 1 for p in self._package_cache.values() if p.parent_id == package.id ) # Update package statistics cast("Any", package).module_count = module_count cast("Any", package).subpackage_count = subpackage_count async def _analyze_package_dependencies(self) -> None: """Analyze dependencies between packages.""" # Get all imports in the repository result = await self.db_session.execute( select(Import, File) .join(File) .where(File.repository_id == self.repository_id) ) imports = result.all() # Track dependencies dependencies: dict[int, dict[int, list[dict[str, Any]]]] = defaultdict( lambda: defaultdict(list) ) for import_obj, file in imports: # Find source package source_package = self._find_package_for_file(file.path) if not source_package: continue # Parse import to find target package if import_obj.module_name: target_package = self._find_package_by_import(import_obj.module_name) if target_package: sid = cast("int", source_package.id) tid = cast("int", target_package.id) if tid != sid: dependencies[sid][tid].append( { "module": import_obj.module_name, "names": import_obj.imported_names, "line": import_obj.line_number, } ) # Create dependency records for source_id, targets in dependencies.items(): for target_id, import_list in targets.items(): dep = PackageDependency( source_package_id=source_id, target_package_id=target_id, import_count=len(import_list), dependency_type="direct", import_details=import_list, ) self.db_session.add(dep) def _find_package_for_file(self, file_path: str) -> Package | None: """Find which package a file belongs to.""" file_dir = str(Path(file_path).parent) # Check exact match first if file_dir in self._package_cache: return self._package_cache[file_dir] # Check parent directories path = Path(file_path) for parent in path.parents: parent_str = str(parent) if parent_str in self._package_cache: return self._package_cache[parent_str] return None def _find_package_by_import(self, module_name: str) -> Package | None: """Find package by import name.""" # For relative imports within the repository parts = module_name.split(".") # Try to match against package paths for i in range(len(parts), 0, -1): potential_path = "/".join(parts[:i]) if potential_path in self._package_cache: return self._package_cache[potential_path] # Try to match against package names for package in self._package_cache.values(): if package.name == parts[0]: return package return None async def _calculate_package_metrics(self) -> None: """Calculate metrics for each package.""" for package in self._package_cache.values(): metrics = PackageMetrics(package_id=package.id) # Get all files in package and subpackages package_files = [] for file_path, file in self._file_cache.items(): if self._is_file_in_package(file_path, cast("str", package.path)): package_files.append(file) if not package_files: self.db_session.add(metrics) continue # Get all modules, classes, and functions file_ids = [f.id for f in package_files] # Get modules module_result = await self.db_session.execute( select(Module).where(Module.file_id.in_(file_ids)) ) modules = module_result.scalars().all() # Count lines, functions, classes total_lines = sum(f.size or 0 for f in package_files) total_functions = 0 total_classes = 0 total_complexity = 0 max_complexity = 0 # Get detailed statistics from the database from src.database.models import Class, Function all_classes: list[Any] = [] all_functions: list[Any] = [] for module in modules: # Count classes class_result = await self.db_session.execute( select(Class).where(Class.module_id == module.id) ) classes = class_result.scalars().all() all_classes.extend(classes) total_classes += len(classes) # Count functions func_result = await self.db_session.execute( select(Function).where(Function.module_id == module.id) ) functions = func_result.scalars().all() all_functions.extend(functions) total_functions += len(functions) # Calculate complexity for func in functions: val = cast("int | None", func.complexity) if val is not None: total_complexity += val max_complexity = max(max_complexity, val) # Update package stats cast("Any", package).total_lines = total_lines cast("Any", package).total_functions = total_functions cast("Any", package).total_classes = total_classes # Update metrics cast("Any", metrics).total_loc = total_lines cast("Any", metrics).total_complexity = total_complexity cast("Any", metrics).max_complexity = max_complexity if total_functions > 0: cast("Any", metrics).avg_complexity = int( total_complexity // total_functions ) # Check for tests cast("Any", metrics).has_tests = any( "test" in f.path or "tests" in f.path for f in package_files ) # Check for docs cast("Any", metrics).has_docs = bool( package.readme_file_id or package.docstring ) # Count public API public_classes = sum( 1 for cls in all_classes if not cls.name.startswith("_") ) public_functions = sum( 1 for func in all_functions if not func.name.startswith("_") and not func.class_id ) cast("Any", metrics).public_classes = public_classes cast("Any", metrics).public_functions = public_functions self.db_session.add(metrics) def _is_file_in_package(self, file_path: str, package_path: str) -> bool: """Check if a file belongs to a package or its subpackages.""" if not package_path: # Root package return True return file_path.startswith(package_path + "/") async def _build_package_tree(self) -> dict[str, Any]: """Build a tree representation of the package structure.""" def build_node(package: Package) -> dict[str, Any]: children = [ build_node(p) for p in self._package_cache.values() if p.parent_id == package.id ] return { "name": package.name, "path": package.path, "modules": package.module_count, "subpackages": package.subpackage_count, "has_init": package.has_init, "is_namespace": package.is_namespace, "children": children, } # Find root packages roots = [p for p in self._package_cache.values() if p.parent_id is None] return {"packages": [build_node(root) for root in roots]}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johannhartmann/mcpcodeanalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server