Skip to main content
Glama
srwlli

Documentation Generator MCP Server

by srwlli
plan_validator.py21.8 kB
""" Plan validation for implementation plans (QUA-001). Validates implementation plans against quality checklist from feature-implementation-planning-standard.json. Scores plans 0-100 based on completeness, quality, and autonomy. Enables iterative review loop. """ from pathlib import Path from typing import List import json import re import time from type_defs import ValidationResultDict, ValidationIssueDict from logger_config import logger class PlanValidator: """Validates implementation plans against quality checklist.""" def __init__(self, plan_path: Path): """Initialize validator with path to plan file. Args: plan_path: Path to plan JSON file """ self.plan_path = plan_path self.plan_data = None self.issues: List[ValidationIssueDict] = [] def validate(self) -> ValidationResultDict: """Validate plan and return results. Returns: ValidationResultDict with score, issues, and checklist results """ start_time = time.time() logger.info(f'Starting validation of plan: {self.plan_path}') # Load plan JSON self._load_plan() # Run validators self.validate_structure() self.validate_completeness() self.validate_quality() self.validate_autonomy() # Check for circular dependencies if 'UNIVERSAL_PLANNING_STRUCTURE' in self.plan_data: structure = self.plan_data['UNIVERSAL_PLANNING_STRUCTURE'] if '6_implementation_phases' in structure: self._validate_no_circular_dependencies(structure['6_implementation_phases']) # Calculate score and determine result score = self.calculate_score() result = self.determine_result(score) approved = score >= 90 # Build checklist results checklist_results = self._build_checklist_results() # Track performance duration = time.time() - start_time logger.info(f'Validation complete: score={score}, result={result}, issues={len(self.issues)}, duration={duration:.2f}s') return ValidationResultDict( validation_result=result, score=score, issues=self.issues, checklist_results=checklist_results, approved=approved ) def _load_plan(self): """Load and parse plan JSON file.""" try: with open(self.plan_path, 'r', encoding='utf-8') as f: self.plan_data = json.load(f) except json.JSONDecodeError as e: raise ValueError(f'Invalid JSON in plan file: {str(e)}') except FileNotFoundError: raise FileNotFoundError(f'Plan file not found: {self.plan_path}') def validate_structure(self): """Validate plan has all required sections.""" logger.debug('Validating structure...') required_sections = [ '0_preparation', '1_executive_summary', '2_risk_assessment', '3_current_state_analysis', '4_key_features', '5_task_id_system', '6_implementation_phases', '7_testing_strategy', '8_success_criteria', '9_implementation_checklist' ] # Check META_DOCUMENTATION if 'META_DOCUMENTATION' not in self.plan_data: self.issues.append({ 'severity': 'critical', 'section': 'structure', 'issue': 'Missing META_DOCUMENTATION section', 'suggestion': 'Add META_DOCUMENTATION section with plan_id, plan_name, status, estimated_effort' }) # Check UNIVERSAL_PLANNING_STRUCTURE if 'UNIVERSAL_PLANNING_STRUCTURE' not in self.plan_data: self.issues.append({ 'severity': 'critical', 'section': 'structure', 'issue': 'Missing UNIVERSAL_PLANNING_STRUCTURE section', 'suggestion': 'Add UNIVERSAL_PLANNING_STRUCTURE section containing sections 0-9' }) return # Can't check subsections if parent missing # Check each required section 0-9 structure = self.plan_data['UNIVERSAL_PLANNING_STRUCTURE'] for section in required_sections: if section not in structure: self.issues.append({ 'severity': 'critical', 'section': 'structure', 'issue': f'Missing section {section}', 'suggestion': f'Add section {section} to UNIVERSAL_PLANNING_STRUCTURE' }) def validate_completeness(self): """Validate no placeholders, all task IDs valid.""" logger.debug('Validating completeness...') # Check for placeholder text plan_json_str = json.dumps(self.plan_data) placeholder_pattern = r'\b(TBD|TODO|\[placeholder\]|Coming soon|Fill this in|to be determined)\b' matches = re.finditer(placeholder_pattern, plan_json_str, re.IGNORECASE) for match in matches: self.issues.append({ 'severity': 'major', 'section': 'completeness', 'issue': f'Placeholder text found: "{match.group()}"', 'suggestion': 'Replace placeholder with actual content' }) # Validate task IDs if implementation_phases exists if 'UNIVERSAL_PLANNING_STRUCTURE' in self.plan_data: structure = self.plan_data['UNIVERSAL_PLANNING_STRUCTURE'] if '6_implementation_phases' in structure: self._validate_task_ids(structure['6_implementation_phases']) def _validate_task_ids(self, phases_data): """Validate task IDs are unique and dependencies are valid.""" task_ids = set() dependencies = [] # Extract all task IDs and dependencies for phase_key, phase in phases_data.items(): if isinstance(phase, dict) and 'tasks' in phase: for task in phase['tasks']: if isinstance(task, dict) and 'id' in task: task_id = task['id'] # Check uniqueness if task_id in task_ids: self.issues.append({ 'severity': 'critical', 'section': 'completeness', 'issue': f'Duplicate task ID: {task_id}', 'suggestion': 'Each task ID must be unique' }) task_ids.add(task_id) # Collect dependencies if 'depends_on' in task and task['depends_on']: for dep in task['depends_on']: dependencies.append((task_id, dep)) # Validate dependencies reference existing tasks for task_id, dep_id in dependencies: if dep_id not in task_ids: self.issues.append({ 'severity': 'critical', 'section': 'completeness', 'issue': f'Task {task_id} depends on non-existent task {dep_id}', 'suggestion': f'Ensure task {dep_id} exists or remove dependency' }) def _validate_no_circular_dependencies(self, phases_data): """Detect circular dependencies using DFS.""" # Build adjacency list graph = {} task_ids = set() for phase_key, phase in phases_data.items(): if isinstance(phase, dict) and 'tasks' in phase: for task in phase['tasks']: if isinstance(task, dict) and 'id' in task: task_id = task['id'] task_ids.add(task_id) graph[task_id] = task.get('depends_on', []) # DFS cycle detection visited = set() rec_stack = set() def has_cycle(node): visited.add(node) rec_stack.add(node) for neighbor in graph.get(node, []): if neighbor not in visited: if has_cycle(neighbor): return True elif neighbor in rec_stack: # Cycle detected self.issues.append({ 'severity': 'critical', 'section': 'completeness', 'issue': f'Circular dependency detected involving task {node} and {neighbor}', 'suggestion': 'Remove circular dependency to create valid execution order' }) return True rec_stack.remove(node) return False for task_id in task_ids: if task_id not in visited: has_cycle(task_id) def validate_quality(self): """Validate task descriptions clear, success criteria measurable.""" logger.debug('Validating quality...') if 'UNIVERSAL_PLANNING_STRUCTURE' not in self.plan_data: return structure = self.plan_data['UNIVERSAL_PLANNING_STRUCTURE'] # Validate task descriptions if '6_implementation_phases' in structure: self._validate_task_descriptions(structure['6_implementation_phases']) self._validate_phase_fields(structure['6_implementation_phases']) # Validate success criteria if '8_success_criteria' in structure: self._validate_success_criteria(structure['8_success_criteria']) # Validate edge cases if '7_testing_strategy' in structure: self._validate_edge_cases(structure['7_testing_strategy']) def _validate_task_descriptions(self, phases_data): """Check task descriptions are clear and specific.""" for phase_key, phase in phases_data.items(): if isinstance(phase, dict) and 'tasks' in phase: for task in phase['tasks']: if isinstance(task, dict) and 'description' in task: desc = task['description'] word_count = len(desc.split()) task_id = task.get('id', 'unknown') if word_count < 10: self.issues.append({ 'severity': 'major', 'section': 'quality', 'issue': f'Task {task_id} description too short ({word_count} words)', 'suggestion': 'Expand description to at least 10 words with specific details' }) elif word_count < 20: self.issues.append({ 'severity': 'minor', 'section': 'quality', 'issue': f'Task {task_id} description could be more detailed ({word_count} words)', 'suggestion': 'Consider expanding description to 20+ words for clarity' }) def _validate_phase_fields(self, phases_data): """Check phases have required complexity and effort_level fields.""" for phase_key, phase in phases_data.items(): if isinstance(phase, dict): phase_name = phase.get('title', phase_key) # Check for complexity field if 'complexity' not in phase: self.issues.append({ 'severity': 'major', 'section': 'quality', 'issue': f'Phase {phase_name} missing required "complexity" field', 'suggestion': 'Add complexity rating (low | medium | high | very_high)' }) elif phase['complexity'] not in ['low', 'medium', 'high', 'very_high']: self.issues.append({ 'severity': 'minor', 'section': 'quality', 'issue': f'Phase {phase_name} has invalid complexity value: {phase["complexity"]}', 'suggestion': 'Use one of: low, medium, high, very_high' }) # Check for effort_level field if 'effort_level' not in phase: self.issues.append({ 'severity': 'major', 'section': 'quality', 'issue': f'Phase {phase_name} missing required "effort_level" field', 'suggestion': 'Add effort_level (1-5 scale: 1=trivial, 5=major undertaking)' }) elif not isinstance(phase['effort_level'], int) or phase['effort_level'] < 1 or phase['effort_level'] > 5: self.issues.append({ 'severity': 'minor', 'section': 'quality', 'issue': f'Phase {phase_name} has invalid effort_level: {phase.get("effort_level")}', 'suggestion': 'Use integer 1-5 (1=trivial, 5=major undertaking)' }) # Warn if old "duration" field is still present if 'duration' in phase: self.issues.append({ 'severity': 'major', 'section': 'quality', 'issue': f'Phase {phase_name} contains deprecated "duration" field', 'suggestion': 'Remove "duration" field - use complexity and effort_level instead' }) def _validate_success_criteria(self, criteria_data): """Check success criteria are measurable.""" criteria_json = json.dumps(criteria_data) # Look for numbers, percentages, time units measurable_pattern = r'\d+|\b(\d+\.\d+|percent|%|seconds?|ms|minutes?|hours?|>=|<=|>|<)\b' matches = re.findall(measurable_pattern, criteria_json, re.IGNORECASE) if len(matches) < 3: # Should have several measurable criteria self.issues.append({ 'severity': 'major', 'section': 'quality', 'issue': 'Success criteria lack measurable metrics', 'suggestion': 'Add specific metrics (numbers, percentages, thresholds) to success criteria' }) def _validate_edge_cases(self, testing_data): """Check edge cases are documented.""" edge_case_json = json.dumps(testing_data) # Look for edge case mentions edge_case_pattern = r'edge.?case|scenario|boundary|invalid|empty|null|error|exception' matches = re.findall(edge_case_pattern, edge_case_json, re.IGNORECASE) if len(matches) < 5: self.issues.append({ 'severity': 'major', 'section': 'quality', 'issue': f'Insufficient edge case coverage (found {len(matches)} mentions, need 5+)', 'suggestion': 'Document at least 5-10 edge case scenarios in testing strategy' }) def validate_autonomy(self): """Validate no ambiguity, implementable without clarification.""" logger.debug('Validating autonomy...') plan_json_str = json.dumps(self.plan_data) # Check for ambiguous phrases ambiguous_pattern = r'\b(might|could|maybe|possibly|perhaps|unclear|TBD|to be determined|needs clarification)\b' matches = list(re.finditer(ambiguous_pattern, plan_json_str, re.IGNORECASE)) for match in matches[:5]: # Limit to first 5 to avoid spam self.issues.append({ 'severity': 'major', 'section': 'autonomy', 'issue': f'Ambiguous phrase found: "{match.group()}"', 'suggestion': 'Replace with definitive language - make clear decisions' }) # Check for questions question_pattern = r'(Should we|What about|What if|How do we|Which|\?)' matches = list(re.finditer(question_pattern, plan_json_str, re.IGNORECASE)) for match in matches[:5]: # Limit to first 5 self.issues.append({ 'severity': 'major', 'section': 'autonomy', 'issue': f'Question found in plan: "{match.group()}"', 'suggestion': 'Answer the question in the plan - no unresolved questions' }) def calculate_score(self) -> int: """Calculate 0-100 score based on issues.""" score = 100 for issue in self.issues: if issue['severity'] == 'critical': score -= 10 elif issue['severity'] == 'major': score -= 5 elif issue['severity'] == 'minor': score -= 1 return max(0, score) def determine_result(self, score: int) -> str: """Determine validation result from score.""" if score >= 90: return 'PASS' elif score >= 85: return 'PASS_WITH_WARNINGS' elif score >= 70: return 'NEEDS_REVISION' else: return 'FAIL' def _build_checklist_results(self) -> dict: """Build checklist results dict mapping all 23 checklist items to pass/fail.""" results = {} # Completeness checklist items (9 items) results['executive_summary_complete'] = self._check_executive_summary_complete() results['risk_assessment_present'] = self._check_section_present('2_risk_assessment') results['current_state_documented'] = self._check_section_present('3_current_state_analysis') results['key_features_defined'] = self._check_section_present('4_key_features') results['all_tasks_have_ids'] = not self._has_issues_matching('task.*without.*id', ignore_case=True) results['phases_defined'] = self._check_section_present('6_implementation_phases') results['testing_strategy_present'] = self._check_section_present('7_testing_strategy') results['success_criteria_defined'] = self._check_section_present('8_success_criteria') results['implementation_checklist_present'] = self._check_section_present('9_implementation_checklist') # Quality checklist items (8 items) results['no_placeholder_text'] = not self._has_issues_matching('Placeholder text found') results['task_descriptions_imperative'] = not self._has_issues_matching('description.*not.*imperative', ignore_case=True) results['success_criteria_measurable'] = not self._has_issues_matching('Success criteria lack') results['edge_cases_comprehensive'] = not self._has_issues_matching('Insufficient edge case') results['effort_estimates_realistic'] = not self._has_issues_matching('effort.*unrealistic', ignore_case=True) results['dependencies_valid'] = not self._has_issues_matching('depends on non-existent') results['security_addressed'] = not self._has_issues_matching('security.*not.*addressed', ignore_case=True) results['performance_targets_specified'] = not self._has_issues_matching('performance.*not.*specified', ignore_case=True) # Autonomy checklist items (6 items) results['no_ambiguous_phrases'] = not self._has_issues_matching('Ambiguous phrase') results['no_questions'] = not self._has_issues_matching('Question found') results['edge_case_behavior_defined'] = not self._has_issues_matching('edge case.*undefined', ignore_case=True) results['acceptance_criteria_clear'] = not self._has_issues_matching('acceptance.*unclear', ignore_case=True) results['review_gates_specified'] = not self._has_issues_matching('review.*gate.*missing', ignore_case=True) results['technical_decisions_documented'] = not self._has_issues_matching('decision.*not.*documented', ignore_case=True) # Additional validation results['no_circular_dependencies'] = not self._has_issues_matching('Circular dependency') return results def _check_executive_summary_complete(self) -> bool: """Check if executive summary has all required fields.""" if 'UNIVERSAL_PLANNING_STRUCTURE' not in self.plan_data: return False structure = self.plan_data['UNIVERSAL_PLANNING_STRUCTURE'] if '1_executive_summary' not in structure: return False summary = structure['1_executive_summary'] required_fields = ['feature_overview', 'value_proposition', 'real_world_analogy', 'primary_use_cases', 'success_metrics'] return all(field in summary for field in required_fields) def _check_section_present(self, section_name: str) -> bool: """Check if a specific section is present.""" if 'UNIVERSAL_PLANNING_STRUCTURE' not in self.plan_data: return False return section_name in self.plan_data['UNIVERSAL_PLANNING_STRUCTURE'] def _has_issues_in_section(self, section: str) -> bool: """Check if any issues in given section.""" return any(issue['section'] == section for issue in self.issues) def _has_critical_in_section(self, section: str) -> bool: """Check if any critical issues in given section.""" return any(issue['section'] == section and issue['severity'] == 'critical' for issue in self.issues) def _has_issues_matching(self, pattern: str, ignore_case: bool = False) -> bool: """Check if any issues match pattern.""" flags = re.IGNORECASE if ignore_case else 0 regex = re.compile(pattern, flags) return any(regex.search(issue['issue']) for issue in self.issues)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/srwlli/docs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server