Skip to main content
Glama
johannhartmann

MCP Code Analysis Server

pattern_analyzer.py30.3 kB
"""Analyze domain patterns and anti-patterns in codebases.""" from collections import defaultdict from datetime import UTC, datetime, timedelta from typing import Any, cast from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from src.database.domain_models import ( BoundedContext, BoundedContextMembership, DomainEntity, DomainRelationship, ) from src.database.models import File from src.logger import get_logger logger = get_logger(__name__) # Constants for coupling thresholds HIGH_COUPLING_THRESHOLD = 5 VERY_HIGH_COUPLING_THRESHOLD = 10 HIGH_COUPLING_SCORE = 3 ENTITY_GROWTH_RATE_THRESHOLD = 20 GOD_OBJECT_RESPONSIBILITIES_THRESHOLD = 7 RAPID_COUPLING_THRESHOLD = 20 CONTEXT_SHARE_THRESHOLD = 2 # Constants for coupling buckets LOW_COUPLING_THRESHOLD = 1 MEDIUM_COUPLING_THRESHOLD = 3 class DomainPatternAnalyzer: """Analyze domain patterns, anti-patterns, and evolution.""" def __init__(self, db_session: AsyncSession) -> None: """Initialize the pattern analyzer. Args: db_session: Database session """ self.db_session = db_session async def analyze_cross_context_coupling( self, repository_id: int | None = None, ) -> dict[str, Any]: """Analyze coupling between bounded contexts. Args: repository_id: Optional repository filter Returns: Coupling analysis with metrics and recommendations """ # Get all contexts query = select(BoundedContext) if repository_id: # Filter by repository through entities query = ( query.join( BoundedContextMembership, BoundedContext.id == BoundedContextMembership.bounded_context_id, ) .join( DomainEntity, BoundedContextMembership.domain_entity_id == DomainEntity.id, ) .join( File, # SQLite compatibility: use JSON contains instead of array any DomainEntity.source_entities.contains(File.id), ) .where(File.repository_id == repository_id) .distinct() ) result = await self.db_session.execute(query) contexts = result.scalars().all() coupling_analysis: dict[str, Any] = { "contexts": [], "high_coupling_pairs": [], "recommendations": [], "metrics": { "average_coupling": 0, "max_coupling": 0, "coupling_distribution": {}, }, } # Analyze each context context_metrics: dict[int, dict[str, Any]] = {} total_coupling = 0.0 coupling_scores: list[float] = [] for context in contexts: # Get entities in this context membership_result = await self.db_session.execute( select(DomainEntity) .join(BoundedContextMembership) .where(BoundedContextMembership.bounded_context_id == context.id), ) entities = membership_result.scalars().all() entity_ids = [e.id for e in entities] # Count outgoing relationships to other contexts outgoing_result = await self.db_session.execute( select( DomainRelationship, DomainEntity.id, BoundedContextMembership.bounded_context_id, ) .join( DomainEntity, DomainRelationship.target_entity_id == DomainEntity.id, ) .join( BoundedContextMembership, DomainEntity.id == BoundedContextMembership.domain_entity_id, ) .where( DomainRelationship.source_entity_id.in_(entity_ids), BoundedContextMembership.bounded_context_id != context.id, ), ) # Group by target context coupling_by_context: dict[int, int] = defaultdict(int) relationship_types = defaultdict(set) for rel, _target_entity_id, target_context_id in outgoing_result: t_id = target_context_id coupling_by_context[t_id] += 1 relationship_types[t_id].add(rel.relationship_type) # Calculate metrics total_outgoing = sum(coupling_by_context.values()) context_coupling_score = total_outgoing / max(len(entities), 1) context_metrics[context.id] = { "name": context.name, "entity_count": len(entities), "outgoing_relationships": total_outgoing, "coupling_score": context_coupling_score, "coupled_contexts": len(coupling_by_context), "coupling_details": dict(coupling_by_context), "relationship_types": { k: list(v) for k, v in relationship_types.items() }, } total_coupling += context_coupling_score coupling_scores.append(context_coupling_score) cast("list[dict[str, Any]]", coupling_analysis["contexts"]).append( { "name": context.name, "coupling_score": round(context_coupling_score, 2), "outgoing_dependencies": total_outgoing, "coupled_with": len(coupling_by_context), }, ) # Find high coupling pairs for ctx1_metrics in context_metrics.values(): for ctx2_id, count in ctx1_metrics["coupling_details"].items(): if count > HIGH_COUPLING_THRESHOLD: ctx2_metrics = context_metrics.get(ctx2_id, {}) cast( "list[dict[str, Any]]", coupling_analysis["high_coupling_pairs"], ).append( { "source": ctx1_metrics["name"], "target": ctx2_metrics.get("name", "Unknown"), "relationship_count": count, "relationship_types": ctx1_metrics[ "relationship_types" ].get(ctx2_id, []), "recommendation": self._get_coupling_recommendation( count, ctx1_metrics["relationship_types"].get(ctx2_id, []), ), }, ) # Calculate overall metrics if coupling_scores: coupling_analysis["metrics"]["average_coupling"] = round( total_coupling / len(coupling_scores), 2, ) coupling_analysis["metrics"]["max_coupling"] = round( max(coupling_scores), 2, ) # Distribution for score in coupling_scores: if score < LOW_COUPLING_THRESHOLD: bucket = "low" elif score < MEDIUM_COUPLING_THRESHOLD: bucket = "medium" else: bucket = "high" cast( "dict[str, int]", coupling_analysis["metrics"]["coupling_distribution"], )[bucket] = ( coupling_analysis["metrics"]["coupling_distribution"].get(bucket, 0) + 1 ) # Generate recommendations coupling_analysis["recommendations"] = ( await self._generate_coupling_recommendations( context_metrics, ) ) return coupling_analysis async def suggest_context_splits( self, min_entities: int = 20, max_cohesion_threshold: float = 0.4, ) -> list[dict[str, Any]]: """Suggest how to split large bounded contexts. Args: min_entities: Minimum entities for a context to be considered max_cohesion_threshold: Maximum cohesion score to suggest split Returns: List of split suggestions with details """ # Find large contexts with low cohesion result = await self.db_session.execute( select(BoundedContext).where( BoundedContext.cohesion_score <= max_cohesion_threshold, ), ) candidates = [] for context in result.scalars().all(): # Count memberships membership_count_result = await self.db_session.execute( select(func.count(BoundedContextMembership.id)).where( BoundedContextMembership.bounded_context_id == context.id ) ) membership_count = membership_count_result.scalar() or 0 if membership_count >= min_entities: candidates.append(context) suggestions: list[dict[str, Any]] = [] for context in candidates: # Get entities and their relationships entity_result = await self.db_session.execute( select(DomainEntity) .join(BoundedContextMembership) .where(BoundedContextMembership.bounded_context_id == context.id), ) entities = entity_result.scalars().all() # Build internal relationship graph entity_graph: dict[int, set[int]] = defaultdict(set) entity_map: dict[int, DomainEntity] = {e.id: e for e in entities} for entity in entities: # Get relationships where this entity is source rel_result = await self.db_session.execute( select(DomainRelationship).where( DomainRelationship.source_entity_id == entity.id, DomainRelationship.target_entity_id.in_(entity_map.keys()), ), ) for rel in rel_result.scalars().all(): entity_graph[entity.id].add(rel.target_entity_id) entity_graph[rel.target_entity_id].add(entity.id) # Find clusters using simple connected components clusters = self._find_entity_clusters(entity_graph, entity_map) if len(clusters) > 1: suggestion: dict[str, Any] = { "context": context.name, "current_size": len(entities), "cohesion_score": context.cohesion_score, "suggested_splits": [], "reasoning": f"Low cohesion ({context.cohesion_score:.2f}) indicates weak relationships between entity groups", } for _i, cluster in enumerate(clusters): cluster_entities = [entity_map[eid] for eid in cluster] # Find potential aggregate roots aggregate_roots = [ e for e in cluster_entities if e.entity_type == "aggregate_root" ] # Determine cluster theme cluster_name = self._suggest_cluster_name( cluster_entities, aggregate_roots, ) suggestion["suggested_splits"].append( { "suggested_name": cluster_name, "entity_count": len(cluster), "aggregate_roots": [a.name for a in aggregate_roots], "key_entities": [e.name for e in cluster_entities[:5]], "cohesion_estimate": self._estimate_cluster_cohesion( cluster, entity_graph, ), }, ) suggestions.append(suggestion) return list(suggestions) async def detect_anti_patterns( self, _repository_id: int | None = None, ) -> dict[str, list[dict[str, Any]]]: """Detect DDD anti-patterns in the codebase. Args: repository_id: Optional repository filter Returns: Dictionary of anti-patterns found """ anti_patterns: dict[str, list[dict[str, Any]]] = { "anemic_domain_models": [], "god_objects": [], "circular_dependencies": [], "missing_aggregate_roots": [], "chatty_contexts": [], "shared_kernel_abuse": [], } # 1. Detect anemic domain models (entities with no business rules) query = select(DomainEntity).where( DomainEntity.entity_type.in_(["entity", "aggregate_root"]), # SQLite compatibility: check JSON array length func.json_array_length(DomainEntity.business_rules) == 0, func.json_array_length(DomainEntity.invariants) == 0, ) result = await self.db_session.execute(query) for entity in result.scalars().all(): anti_patterns["anemic_domain_models"].append( { "entity": entity.name, "type": entity.entity_type, "issue": "No business rules or invariants defined", "recommendation": "Add business logic to make this a rich domain model", "severity": "medium", }, ) # 2. Detect god objects (entities with too many responsibilities) query = select(DomainEntity).where( func.json_array_length(DomainEntity.responsibilities) > GOD_OBJECT_RESPONSIBILITIES_THRESHOLD, ) result = await self.db_session.execute(query) for entity in result.scalars().all(): anti_patterns["god_objects"].append( { "entity": entity.name, "responsibility_count": len(entity.responsibilities), "responsibilities": [*entity.responsibilities[:5], "..."], "issue": "Too many responsibilities", "recommendation": "Split into multiple focused entities or extract domain services", "severity": "high", }, ) # 3. Detect circular dependencies circular_deps = await self._find_circular_dependencies() anti_patterns["circular_dependencies"] = circular_deps # 4. Detect missing aggregate roots (contexts with only entities) contexts_result = await self.db_session.execute( select(BoundedContext), ) for context in contexts_result.scalars().all(): # Get entity types in context entity_result = await self.db_session.execute( select(DomainEntity.entity_type) .join(BoundedContextMembership) .where(BoundedContextMembership.bounded_context_id == context.id) .distinct(), ) entity_types = [row[0] for row in entity_result] if "entity" in entity_types and "aggregate_root" not in entity_types: anti_patterns["missing_aggregate_roots"].append( { "context": context.name, "issue": "Context has entities but no aggregate root", "recommendation": "Identify the main entity that maintains consistency and make it an aggregate root", "severity": "high", }, ) # 5. Detect chatty contexts (too many inter-context calls) chatty = await self._detect_chatty_contexts() anti_patterns["chatty_contexts"] = chatty # 6. Detect shared kernel abuse shared_kernel = await self._detect_shared_kernel_abuse() anti_patterns["shared_kernel_abuse"] = shared_kernel return anti_patterns async def analyze_evolution( self, _repository_id: int, days: int = 30, ) -> dict[str, Any]: """Analyze how the domain model evolved over time. Args: repository_id: Repository to analyze days: Number of days to look back Returns: Evolution analysis with trends and changes """ since_date = datetime.now(UTC) - timedelta(days=days) evolution: dict[str, Any] = { "time_period": f"Last {days} days", "entity_changes": { "added": [], "removed": [], "modified": [], }, "context_changes": { "added": [], "removed": [], "resized": [], }, "relationship_changes": { "new_dependencies": [], "removed_dependencies": [], }, "trends": { "entity_growth_rate": 0, "context_stability": 0, "coupling_trend": "stable", }, "insights": [], } # Get entities created in time period new_entities_result = await self.db_session.execute( select(DomainEntity).where( DomainEntity.created_at >= since_date, ), ) for entity in new_entities_result.scalars().all(): evolution["entity_changes"]["added"].append( { "name": entity.name, "type": entity.entity_type, "created": entity.created_at.isoformat(), }, ) # Get contexts created in time period new_contexts_result = await self.db_session.execute( select(BoundedContext).where(BoundedContext.created_at >= since_date), ) for context in new_contexts_result.scalars().all(): # Count memberships for this context membership_count_result = await self.db_session.execute( select(func.count(BoundedContextMembership.id)).where( BoundedContextMembership.bounded_context_id == context.id ) ) membership_count = membership_count_result.scalar() or 0 evolution["context_changes"]["added"].append( { "name": context.name, "created": context.created_at.isoformat(), "size": membership_count, }, ) # Calculate trends # For simplicity with SQLite, just count all entities # In production with PostgreSQL, you could filter by repository total_entities = await self.db_session.execute( select(func.count(DomainEntity.id)) ) entity_count = total_entities.scalar() or 0 if entity_count > 0: growth_rate = len(evolution["entity_changes"]["added"]) / entity_count evolution["trends"]["entity_growth_rate"] = round(growth_rate * 100, 1) # Generate insights if evolution["trends"]["entity_growth_rate"] > ENTITY_GROWTH_RATE_THRESHOLD: cast("list[str]", evolution["insights"]).append( "Rapid growth in domain entities indicates active feature development", ) if len(evolution["context_changes"]["added"]) > 0: cast("list[str]", evolution["insights"]).append( "New bounded contexts suggest evolving domain understanding", ) return evolution def _get_coupling_recommendation( self, relationship_count: int, relationship_types: list[str], ) -> str: """Generate recommendation for coupling issues.""" if relationship_count > VERY_HIGH_COUPLING_THRESHOLD: if "orchestrates" in relationship_types: return "Consider using events or a saga pattern to reduce orchestration coupling" if "depends_on" in relationship_types: return "High dependency coupling - consider introducing an anti-corruption layer" return "Very high coupling - evaluate if these contexts should be merged or use shared kernel pattern" if relationship_count > HIGH_COUPLING_THRESHOLD: return ( "Moderate coupling - consider if all these relationships are necessary" ) return "Acceptable coupling level" async def _generate_coupling_recommendations( self, context_metrics: dict[int, dict[str, Any]], ) -> list[str]: """Generate overall coupling recommendations.""" recommendations = [] # Find contexts with highest coupling high_coupling = [ (ctx_id, metrics) for ctx_id, metrics in context_metrics.items() if metrics["coupling_score"] > HIGH_COUPLING_SCORE ] if high_coupling: recommendations.append( f"Consider reviewing {len(high_coupling)} contexts with high coupling scores", ) # Check for asymmetric relationships for ctx_id, metrics in context_metrics.items(): for coupled_ctx_id, count in metrics["coupling_details"].items(): reverse_count = ( context_metrics.get(coupled_ctx_id, {}) .get( "coupling_details", {}, ) .get(ctx_id, 0) ) if count > HIGH_COUPLING_THRESHOLD and reverse_count == 0: recommendations.append( f"{metrics['name']} has one-way dependency on another context - " "consider if this is a customer-supplier relationship", ) break return recommendations def _find_entity_clusters( self, entity_graph: dict[int, set[int]], entity_map: dict[int, DomainEntity], ) -> list[set[int]]: """Find connected components in entity graph.""" visited = set() clusters = [] def dfs(entity_id: int, cluster: set[int]) -> None: if entity_id in visited: return visited.add(entity_id) cluster.add(entity_id) for neighbor in entity_graph.get(entity_id, set()): dfs(neighbor, cluster) for entity_id in entity_map: if entity_id not in visited: cluster: set[int] = set() dfs(entity_id, cluster) if cluster: clusters.append(cluster) return clusters def _suggest_cluster_name( self, entities: list[DomainEntity], aggregate_roots: list[DomainEntity], ) -> str: """Suggest a name for an entity cluster.""" if aggregate_roots: return f"{aggregate_roots[0].name} Context" if entities: # Use most common word in entity names words: dict[str, int] = defaultdict(int) for entity in entities: for word in entity.name.split(): words[word.lower()] += 1 if words: most_common = max(words.items(), key=lambda x: x[1])[0] return f"{most_common.title()} Context" return "Unnamed Context" def _estimate_cluster_cohesion( self, cluster: set[int], entity_graph: dict[int, set[int]], ) -> float: """Estimate cohesion of an entity cluster.""" if len(cluster) <= 1: return 1.0 internal_edges = 0 for entity_id in cluster: for neighbor in entity_graph.get(entity_id, set()): if neighbor in cluster: internal_edges += 1 # Divide by 2 since we count each edge twice internal_edges //= 2 # Maximum possible edges max_edges = len(cluster) * (len(cluster) - 1) / 2 return internal_edges / max_edges if max_edges > 0 else 0 async def _find_circular_dependencies(self) -> list[dict[str, Any]]: """Find circular dependencies between entities.""" # This is a simplified version - could be enhanced with cycle detection result = await self.db_session.execute( select( DomainRelationship.source_entity_id, DomainRelationship.target_entity_id, ), ) # Build adjacency list graph = defaultdict(set) for source_id, target_id in result: graph[source_id].add(target_id) cycles = [] visited = set() rec_stack = set() def has_cycle(node: int, path: list[int]) -> bool: visited.add(node) rec_stack.add(node) path.append(node) for neighbor in graph.get(node, set()): if neighbor not in visited: if has_cycle(neighbor, path.copy()): return True elif neighbor in rec_stack: # Found cycle cycle_start = path.index(neighbor) cycle_path = [*path[cycle_start:], neighbor] cycles.append(cycle_path) rec_stack.remove(node) return False for node in graph: if node not in visited: has_cycle(node, []) # Convert to readable format circular_deps = [] for cycle in cycles[:5]: # Limit to 5 # Get entity names entities = [] for entity_id in cycle: entity_result = await self.db_session.execute( select(DomainEntity).where(DomainEntity.id == entity_id), ) entity = entity_result.scalar_one_or_none() if entity: entities.append(entity.name) if len(entities) > 1: # Coerce to strings for join to satisfy typing cycle_names: list[str] = [str(name) for name in entities] circular_deps.append( { "cycle": " -> ".join(cycle_names), "length": len(cycle_names) - 1, "issue": "Circular dependency creates tight coupling", "recommendation": "Break the cycle by introducing events or inverting dependencies", "severity": "high", }, ) return circular_deps async def _detect_chatty_contexts(self) -> list[dict[str, Any]]: """Detect contexts with excessive inter-context communication.""" # Count relationships between contexts result = await self.db_session.execute( select( BoundedContextMembership.bounded_context_id, func.count(DomainRelationship.id).label("relationship_count"), ) .join( DomainEntity, BoundedContextMembership.domain_entity_id == DomainEntity.id, ) .join( DomainRelationship, DomainEntity.id == DomainRelationship.source_entity_id, ) .group_by(BoundedContextMembership.bounded_context_id) .having(func.count(DomainRelationship.id) > RAPID_COUPLING_THRESHOLD), ) chatty = [] for context_id, count in result: context_result = await self.db_session.execute( select(BoundedContext).where(BoundedContext.id == context_id), ) context = context_result.scalar_one_or_none() if context: chatty.append( { "context": context.name, "external_relationships": count, "issue": "Excessive communication with other contexts", "recommendation": "Consider if this context has too many responsibilities or needs better boundaries", "severity": "medium", }, ) return chatty async def _detect_shared_kernel_abuse(self) -> list[dict[str, Any]]: """Detect overuse of shared kernel pattern.""" # Find contexts that share many entities (simplified check) _shared_entities: dict[int, set[int]] = defaultdict(set) result = await self.db_session.execute( select( DomainEntity.id, BoundedContextMembership.bounded_context_id, ).join(BoundedContextMembership), ) entity_contexts = defaultdict(set) for entity_id, context_id in result: entity_contexts[entity_id].add(context_id) # Find entities in multiple contexts shared_kernel_issues = [] for entity_id, contexts in entity_contexts.items(): if len(contexts) > CONTEXT_SHARE_THRESHOLD: entity_result = await self.db_session.execute( select(DomainEntity).where(DomainEntity.id == entity_id), ) entity = entity_result.scalar_one_or_none() if entity: shared_kernel_issues.append( { "entity": entity.name, "shared_by_contexts": len(contexts), "issue": "Entity shared by too many contexts", "recommendation": "Consider if this truly needs to be shared or if each context needs its own version", "severity": "medium", }, ) return shared_kernel_issues

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johannhartmann/mcpcodeanalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server