Skip to main content
Glama
srwlli

Documentation Generator MCP Server

by srwlli
consistency_checker.py11.6 kB
""" Consistency Checker Generator for Tool #10 (check_consistency). Provides lightweight, fast quality gate for checking code changes against established standards. Designed for pre-commit hooks and CI/CD pipelines with <1s performance target. """ from pathlib import Path from typing import List import subprocess import time # Import types from type_defs import StandardsDataDict, AuditViolationDict, ConsistencyResultDict # Import AuditGenerator for composition (reuse violation detection) from generators.audit_generator import AuditGenerator # Import logger from logger_config import logger class ConsistencyChecker: """ Consistency checker for validating code changes against standards. Uses composition with AuditGenerator to reuse violation detection logic. Adds git integration for auto-detecting changed files and terminal-friendly output. """ def __init__(self, project_path: Path, standards_dir: Path): """ Initialize ConsistencyChecker. Args: project_path: Path to project root standards_dir: Path to standards directory """ self.project_path = project_path self.standards_dir = standards_dir # Create AuditGenerator instance for composition self.audit_generator = AuditGenerator(project_path, standards_dir) logger.debug(f"ConsistencyChecker initialized", extra={ 'project_path': str(project_path), 'standards_dir': str(standards_dir) }) # Git Integration Methods (Phase 2) def is_git_repository(self) -> bool: """ Check if project is a git repository. Returns: True if git repo, False otherwise """ try: result = subprocess.run( ['git', 'rev-parse', '--is-inside-work-tree'], cwd=self.project_path, capture_output=True, text=True, timeout=5 ) is_repo = result.returncode == 0 logger.debug(f"Git repository check: {is_repo}", extra={'project_path': str(self.project_path)}) return is_repo except (subprocess.SubprocessError, FileNotFoundError, OSError) as e: logger.debug(f"Git check failed: {str(e)}", extra={'error_type': type(e).__name__}) return False def detect_changed_files(self, mode: str = 'staged') -> List[Path]: """ Detect changed files using git diff. Args: mode: Detection mode - 'staged', 'unstaged', or 'all' Returns: List of Path objects (relative to project root) """ logger.debug(f"Detecting changed files", extra={'mode': mode}) if not self.is_git_repository(): logger.warning("Not a git repository, cannot auto-detect files") return [] try: # Determine git command based on mode if mode == 'staged': cmd = ['git', 'diff', '--name-only', '--cached'] elif mode == 'unstaged': cmd = ['git', 'diff', '--name-only'] elif mode == 'all': # Get both staged and unstaged staged = self.detect_changed_files('staged') unstaged = self.detect_changed_files('unstaged') return list(set(staged + unstaged)) # Deduplicate else: raise ValueError(f"Invalid mode: {mode}") result = subprocess.run( cmd, cwd=self.project_path, capture_output=True, text=True, timeout=10 ) if result.returncode != 0: logger.warning(f"Git diff failed: {result.stderr}") return [] # Parse output files = [ Path(line.strip()) for line in result.stdout.splitlines() if line.strip() ] # Filter out deleted files (they don't exist on filesystem) existing_files = [ f for f in files if (self.project_path / f).exists() ] logger.info(f"Detected {len(existing_files)} changed files", extra={ 'mode': mode, 'total_changed': len(files), 'existing': len(existing_files) }) return existing_files except subprocess.TimeoutExpired: logger.error("Git diff timed out after 10 seconds") return [] except (subprocess.SubprocessError, OSError) as e: logger.error(f"Git operation failed: {str(e)}", extra={'error_type': type(e).__name__}) return [] def get_file_content_from_git(self, file_path: Path, ref: str = 'HEAD') -> str: """ Get file content from git at specific ref. Args: file_path: Relative path to file ref: Git reference (default: HEAD) Returns: File content as string (empty string if not in git) """ try: result = subprocess.run( ['git', 'show', f'{ref}:{file_path}'], cwd=self.project_path, capture_output=True, text=True, timeout=5 ) if result.returncode != 0: return "" # File not in git return result.stdout except (subprocess.SubprocessError, FileNotFoundError, OSError): return "" # Violation Detection Methods (Phase 3) def check_files(self, files: List[Path], standards: StandardsDataDict, scope: List[str]) -> List[AuditViolationDict]: """ Check specific files for violations using AuditGenerator. Args: files: List of file paths to check (relative to project root) standards: Parsed standards data scope: Which standards to check ('ui_patterns', 'behavior_patterns', 'ux_patterns', 'all') Returns: List of violations found """ violations = [] logger.info(f"Checking {len(files)} files", extra={'scope': scope}) for file_path in files: file_start = time.time() # Read file content full_path = self.project_path / file_path try: with open(full_path, 'r', encoding='utf-8') as f: content = f.read() except (OSError, UnicodeDecodeError) as e: logger.warning(f"Skipping file {file_path}: {str(e)}") continue # Check only relevant file types (tsx, jsx, ts, js) if not str(file_path).endswith(('.tsx', '.jsx', '.ts', '.js')): logger.debug(f"Skipping non-code file: {file_path}") continue # Call appropriate AuditGenerator methods based on scope file_violations = [] if 'all' in scope or 'ui_patterns' in scope: ui_violations = self.audit_generator.detect_ui_violations( content, file_path, standards.get('ui_patterns', {}) ) file_violations.extend(ui_violations) if 'all' in scope or 'behavior_patterns' in scope: behavior_violations = self.audit_generator.detect_behavior_violations( content, file_path, standards.get('behavior_patterns', {}) ) file_violations.extend(behavior_violations) if 'all' in scope or 'ux_patterns' in scope: ux_violations = self.audit_generator.detect_ux_violations( content, file_path, standards.get('ux_patterns', {}) ) file_violations.extend(ux_violations) violations.extend(file_violations) file_duration = time.time() - file_start logger.debug(f"Checked {file_path} in {file_duration:.3f}s", extra={ 'file_path': str(file_path), 'violations_found': len(file_violations), 'duration': file_duration }) logger.info(f"Check complete: {len(violations)} violations found", extra={ 'files_checked': len(files), 'total_violations': len(violations) }) return violations def filter_by_severity_threshold(self, violations: List[AuditViolationDict], threshold: str) -> List[AuditViolationDict]: """ Filter violations to only those >= severity threshold. Args: violations: List of all violations threshold: Severity threshold ('critical', 'major', or 'minor') Returns: Filtered list of violations """ # Severity hierarchy: critical > major > minor severity_order = {'critical': 3, 'major': 2, 'minor': 1} threshold_level = severity_order.get(threshold, 1) filtered = [ v for v in violations if severity_order.get(v.get('severity', 'minor'), 1) >= threshold_level ] logger.debug(f"Severity filtering: {len(violations)} -> {len(filtered)}", extra={ 'threshold': threshold, 'original_count': len(violations), 'filtered_count': len(filtered) }) return filtered # Output Formatting Methods (Phase 4) def generate_check_summary(self, violations: List[AuditViolationDict], files_checked: int, duration: float) -> str: """ Generate concise terminal-friendly summary. Args: violations: List of violations found files_checked: Number of files that were checked duration: Check duration in seconds Returns: Formatted summary string """ # Count violations by severity critical_count = sum(1 for v in violations if v.get('severity') == 'critical') major_count = sum(1 for v in violations if v.get('severity') == 'major') minor_count = sum(1 for v in violations if v.get('severity') == 'minor') # Determine status status = "PASSED" if len(violations) == 0 else "FAILED" status_symbol = "[PASS]" if len(violations) == 0 else "[FAIL]" # Build summary lines = [] lines.append(f"{status_symbol} Consistency check {status}\n") lines.append(f"{len(violations)} violations found ({critical_count} critical, {major_count} major, {minor_count} minor)") lines.append(f"Files checked: {files_checked}") lines.append(f"Duration: {duration:.2f}s\n") if len(violations) == 0: lines.append("All files comply with established standards.") else: lines.append("Violations:") for violation in violations: lines.append(" " + self.format_violation_for_terminal(violation)) return '\n'.join(lines) def format_violation_for_terminal(self, violation: AuditViolationDict) -> str: """ Format single violation for terminal display. Args: violation: Violation dictionary Returns: Formatted string (file:line - [severity] message) """ file_path = violation.get('file_path', 'unknown') line = violation.get('line_number', 0) severity = violation.get('severity', 'minor') message = violation.get('message', 'Unknown violation') return f"{file_path}:{line} - [{severity}] {message}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/srwlli/docs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server