Skip to main content
Glama
cross_validator.py44.5 kB
""" Cross-Model Validation This module implements comprehensive validation and quality assurance mechanisms that use multiple models to cross-validate results, detect inconsistencies, and improve overall output quality through peer review processes. """ import asyncio from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, List, Optional, Set, Tuple from datetime import datetime import statistics import logging from .base import ( CollectiveIntelligenceComponent, TaskContext, ProcessingResult, ModelProvider, ModelInfo, QualityMetrics, TaskType, ModelCapability ) logger = logging.getLogger(__name__) class ValidationStrategy(Enum): """Strategies for cross-model validation.""" PEER_REVIEW = "peer_review" # Multiple models review each other's outputs ADVERSARIAL = "adversarial" # Models challenge each other's conclusions CONSENSUS_CHECK = "consensus_check" # Verify agreement across models FACT_CHECK = "fact_check" # Specialized fact-checking validation QUALITY_ASSURANCE = "quality_assurance" # General quality assessment BIAS_DETECTION = "bias_detection" # Detect potential biases in outputs class ValidationCriteria(Enum): """Criteria for validation assessment.""" ACCURACY = "accuracy" CONSISTENCY = "consistency" COMPLETENESS = "completeness" RELEVANCE = "relevance" COHERENCE = "coherence" FACTUAL_CORRECTNESS = "factual_correctness" LOGICAL_SOUNDNESS = "logical_soundness" BIAS_NEUTRALITY = "bias_neutrality" CLARITY = "clarity" APPROPRIATENESS = "appropriateness" class ValidationSeverity(Enum): """Severity levels for validation issues.""" CRITICAL = "critical" # Major errors that invalidate the result HIGH = "high" # Significant issues that need attention MEDIUM = "medium" # Moderate issues that could be improved LOW = "low" # Minor issues or suggestions INFO = "info" # Informational feedback @dataclass class ValidationIssue: """A specific validation issue found during cross-validation.""" issue_id: str criteria: ValidationCriteria severity: ValidationSeverity description: str suggestion: str confidence: float # Confidence in the issue detection evidence: str # Supporting evidence for the issue validator_model_id: str line_numbers: Optional[List[int]] = None # Specific lines if applicable metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ValidationConfig: """Configuration for cross-model validation.""" strategy: ValidationStrategy = ValidationStrategy.PEER_REVIEW min_validators: int = 2 max_validators: int = 4 criteria: List[ValidationCriteria] = field(default_factory=lambda: [ ValidationCriteria.ACCURACY, ValidationCriteria.CONSISTENCY, ValidationCriteria.COMPLETENESS, ValidationCriteria.RELEVANCE ]) confidence_threshold: float = 0.7 require_consensus: bool = False consensus_threshold: float = 0.6 include_self_validation: bool = False timeout_seconds: float = 30.0 specialized_validators: Dict[ValidationCriteria, List[str]] = field(default_factory=dict) @dataclass class ValidationResult: """Final result from cross-model validation.""" task_id: str original_result: ProcessingResult validation_report: 'ValidationReport' is_valid: bool validation_confidence: float improvement_suggestions: List[str] quality_metrics: QualityMetrics processing_time: float metadata: Dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.now) @dataclass class ValidationReport: """Comprehensive validation report for a result.""" original_result: ProcessingResult task_context: TaskContext validation_strategy: ValidationStrategy validator_models: List[str] issues: List[ValidationIssue] overall_score: float # 0.0 to 1.0 criteria_scores: Dict[ValidationCriteria, float] consensus_level: float # Level of agreement among validators recommendations: List[str] revised_content: Optional[str] = None # Improved version if available validation_time: float = 0.0 metadata: Dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.now) @dataclass class ValidationResult: """Result of the complete validation process.""" task_id: str original_result: ProcessingResult validation_report: ValidationReport is_valid: bool validation_confidence: float improvement_suggestions: List[str] quality_metrics: QualityMetrics processing_time: float metadata: Dict[str, Any] = field(default_factory=dict) class SpecializedValidator: """Base class for specialized validation components.""" def __init__(self, criteria: ValidationCriteria, model_provider: ModelProvider): self.criteria = criteria self.model_provider = model_provider async def validate( self, result: ProcessingResult, task_context: TaskContext, validator_model_id: str ) -> List[ValidationIssue]: """Perform specialized validation.""" raise NotImplementedError class FactCheckValidator(SpecializedValidator): """Specialized validator for fact-checking.""" def __init__(self, model_provider: ModelProvider): super().__init__(ValidationCriteria.FACTUAL_CORRECTNESS, model_provider) async def validate( self, result: ProcessingResult, task_context: TaskContext, validator_model_id: str ) -> List[ValidationIssue]: """Validate factual accuracy of the result.""" # Create fact-checking prompt fact_check_prompt = f""" Please fact-check the following response for accuracy: Original Task: {task_context.content} Response to Check: {result.content} Identify any factual errors, inaccuracies, or unsupported claims. For each issue found, provide: 1. Description of the error 2. Correct information if known 3. Confidence level (0.0-1.0) Format your response as a structured analysis. """ fact_check_task = TaskContext( task_id=f"fact_check_{result.task_id}", task_type=TaskType.FACTUAL, content=fact_check_prompt ) try: validation_result = await self.model_provider.process_task( fact_check_task, validator_model_id ) # Parse the validation result to extract issues issues = self._parse_fact_check_result(validation_result, validator_model_id) return issues except Exception as e: logger.warning(f"Fact-checking failed with model {validator_model_id}: {str(e)}") return [] def _parse_fact_check_result( self, validation_result: ProcessingResult, validator_model_id: str ) -> List[ValidationIssue]: """Parse fact-checking result to extract validation issues.""" issues = [] # Simple parsing logic (would be more sophisticated in practice) content = validation_result.content.lower() if "error" in content or "incorrect" in content or "inaccurate" in content: issues.append( ValidationIssue( issue_id=f"fact_check_{len(issues)}", criteria=ValidationCriteria.FACTUAL_CORRECTNESS, severity=ValidationSeverity.HIGH, description="Potential factual inaccuracy detected", suggestion="Verify facts and correct any inaccuracies", confidence=validation_result.confidence, evidence=validation_result.content[:200] + "...", validator_model_id=validator_model_id ) ) return issues class BiasDetectionValidator(SpecializedValidator): """Specialized validator for bias detection.""" def __init__(self, model_provider: ModelProvider): super().__init__(ValidationCriteria.BIAS_NEUTRALITY, model_provider) async def validate( self, result: ProcessingResult, task_context: TaskContext, validator_model_id: str ) -> List[ValidationIssue]: """Detect potential biases in the result.""" bias_check_prompt = f""" Analyze the following response for potential biases: Original Task: {task_context.content} Response to Analyze: {result.content} Look for: 1. Cultural, gender, racial, or other demographic biases 2. Political or ideological biases 3. Unfair generalizations or stereotypes 4. Lack of diverse perspectives Identify any biases found and suggest improvements. """ bias_check_task = TaskContext( task_id=f"bias_check_{result.task_id}", task_type=TaskType.ANALYSIS, content=bias_check_prompt ) try: validation_result = await self.model_provider.process_task( bias_check_task, validator_model_id ) issues = self._parse_bias_result(validation_result, validator_model_id) return issues except Exception as e: logger.warning(f"Bias detection failed with model {validator_model_id}: {str(e)}") return [] def _parse_bias_result( self, validation_result: ProcessingResult, validator_model_id: str ) -> List[ValidationIssue]: """Parse bias detection result to extract issues.""" issues = [] content = validation_result.content.lower() bias_indicators = ["bias", "stereotype", "unfair", "prejudice", "discrimination"] if any(indicator in content for indicator in bias_indicators): issues.append( ValidationIssue( issue_id=f"bias_check_{len(issues)}", criteria=ValidationCriteria.BIAS_NEUTRALITY, severity=ValidationSeverity.MEDIUM, description="Potential bias detected in the response", suggestion="Review and revise to ensure neutrality and fairness", confidence=validation_result.confidence, evidence=validation_result.content[:200] + "...", validator_model_id=validator_model_id ) ) return issues class CrossValidator(CollectiveIntelligenceComponent): """ Cross-model validation system that uses multiple models to validate and improve the quality of AI-generated content. """ def __init__( self, model_provider: ModelProvider, config: Optional[ValidationConfig] = None ): super().__init__(model_provider) self.config = config or ValidationConfig() # Specialized validators self.specialized_validators = { ValidationCriteria.FACTUAL_CORRECTNESS: FactCheckValidator(model_provider), ValidationCriteria.BIAS_NEUTRALITY: BiasDetectionValidator(model_provider) } # Validation history self.validation_history: List[ValidationResult] = [] self.validator_performance: Dict[str, Dict[str, float]] = {} async def process( self, result: ProcessingResult, task_context: TaskContext, **kwargs ) -> ValidationResult: """ Perform cross-model validation on a result. Args: result: The result to validate task_context: Original task context **kwargs: Additional validation options Returns: ValidationResult with comprehensive validation analysis """ start_time = datetime.now() try: # Select validator models validator_models = await self._select_validator_models(result, task_context) # Perform validation using selected strategy validation_report = await self._perform_validation( result, task_context, validator_models ) # Calculate overall validation metrics validation_confidence = self._calculate_validation_confidence(validation_report) is_valid = self._determine_validity(validation_report) improvement_suggestions = self._generate_improvement_suggestions(validation_report) quality_metrics = self._calculate_validation_quality_metrics(validation_report) # Create final validation result processing_time = (datetime.now() - start_time).total_seconds() validation_result = ValidationResult( task_id=task_context.task_id, original_result=result, validation_report=validation_report, is_valid=is_valid, validation_confidence=validation_confidence, improvement_suggestions=improvement_suggestions, quality_metrics=quality_metrics, processing_time=processing_time, metadata={ 'validator_count': len(validator_models), 'total_issues': len(validation_report.issues), 'critical_issues': len([i for i in validation_report.issues if i.severity == ValidationSeverity.CRITICAL]) } ) # Update validation history and metrics self.validation_history.append(validation_result) self._update_validator_performance(validation_report) logger.info( f"Validation completed for task {task_context.task_id}: " f"{'VALID' if is_valid else 'INVALID'} " f"(confidence: {validation_confidence:.3f}, " f"issues: {len(validation_report.issues)})" ) return validation_result except Exception as e: logger.error(f"Validation failed for task {task_context.task_id}: {str(e)}") raise async def _select_validator_models( self, result: ProcessingResult, task_context: TaskContext ) -> List[str]: """Select appropriate validator models.""" available_models = await self.model_provider.get_available_models() # Filter out the original model if self-validation is disabled if not self.config.include_self_validation: available_models = [ model for model in available_models if model.model_id != result.model_id ] # Check for specialized validators specialized_models = [] for criteria in self.config.criteria: if criteria in self.config.specialized_validators: specialized_models.extend(self.config.specialized_validators[criteria]) # Score models for validation suitability scored_models = [] for model in available_models: score = self._calculate_validator_suitability(model, task_context, result) scored_models.append((model.model_id, score)) # Add specialized models with high scores for model_id in specialized_models: if model_id not in [m[0] for m in scored_models]: scored_models.append((model_id, 0.9)) # High score for specialized validators # Sort by score and select top validators scored_models.sort(key=lambda x: x[1], reverse=True) selected_count = min( max(self.config.min_validators, len(scored_models)), self.config.max_validators ) validator_models = [model_id for model_id, _ in scored_models[:selected_count]] logger.info(f"Selected {len(validator_models)} validators: {validator_models}") return validator_models def _calculate_validator_suitability( self, model: ModelInfo, task_context: TaskContext, result: ProcessingResult ) -> float: """Calculate how suitable a model is for validation.""" base_score = 0.5 # Higher accuracy models are better validators accuracy_bonus = model.accuracy_score * 0.3 # Models with relevant capabilities score higher capability_bonus = 0.0 relevant_capabilities = [ModelCapability.ACCURACY, ModelCapability.REASONING] for cap in relevant_capabilities: if cap in model.capabilities: capability_bonus += model.capabilities[cap] * 0.1 # Check historical validation performance performance_bonus = 0.0 if model.model_id in self.validator_performance: perf = self.validator_performance[model.model_id] performance_bonus = perf.get('accuracy', 0.0) * 0.2 return min(1.0, base_score + accuracy_bonus + capability_bonus + performance_bonus) async def _perform_validation( self, result: ProcessingResult, task_context: TaskContext, validator_models: List[str] ) -> ValidationReport: """Perform the actual validation using selected models.""" if self.config.strategy == ValidationStrategy.PEER_REVIEW: return await self._peer_review_validation(result, task_context, validator_models) elif self.config.strategy == ValidationStrategy.ADVERSARIAL: return await self._adversarial_validation(result, task_context, validator_models) elif self.config.strategy == ValidationStrategy.CONSENSUS_CHECK: return await self._consensus_validation(result, task_context, validator_models) elif self.config.strategy == ValidationStrategy.FACT_CHECK: return await self._fact_check_validation(result, task_context, validator_models) elif self.config.strategy == ValidationStrategy.QUALITY_ASSURANCE: return await self._quality_assurance_validation(result, task_context, validator_models) elif self.config.strategy == ValidationStrategy.BIAS_DETECTION: return await self._bias_detection_validation(result, task_context, validator_models) else: # Default to peer review return await self._peer_review_validation(result, task_context, validator_models) async def _peer_review_validation( self, result: ProcessingResult, task_context: TaskContext, validator_models: List[str] ) -> ValidationReport: """Perform peer review validation.""" all_issues = [] criteria_scores = {} # Create validation tasks for each validator validation_tasks = [] for validator_model_id in validator_models: task = self._create_peer_review_task(result, task_context, validator_model_id) validation_tasks.append((validator_model_id, task)) # Execute validation tasks concurrently validation_results = await asyncio.gather( *[self._execute_validation_task(model_id, task) for model_id, task in validation_tasks], return_exceptions=True ) # Process validation results for i, validation_result in enumerate(validation_results): if isinstance(validation_result, Exception): logger.warning(f"Validation failed for validator {validator_models[i]}: {str(validation_result)}") continue validator_model_id = validator_models[i] issues = self._parse_peer_review_result(validation_result, validator_model_id) all_issues.extend(issues) # Calculate criteria scores for criteria in self.config.criteria: criteria_issues = [issue for issue in all_issues if issue.criteria == criteria] criteria_scores[criteria] = self._calculate_criteria_score(criteria_issues) # Calculate overall score and consensus overall_score = self._calculate_overall_score(criteria_scores) consensus_level = self._calculate_consensus_level(all_issues, validator_models) return ValidationReport( original_result=result, task_context=task_context, validation_strategy=self.config.strategy, validator_models=validator_models, issues=all_issues, overall_score=overall_score, criteria_scores=criteria_scores, consensus_level=consensus_level, recommendations=self._generate_recommendations(all_issues) ) def _create_peer_review_task( self, result: ProcessingResult, task_context: TaskContext, validator_model_id: str ) -> TaskContext: """Create a peer review validation task.""" criteria_text = ", ".join([c.value for c in self.config.criteria]) review_prompt = f""" Please review the following AI-generated response for quality and accuracy: Original Task: {task_context.content} Response to Review: {result.content} Evaluate the response based on these criteria: {criteria_text} For each criterion, provide: 1. A score from 0.0 to 1.0 2. Specific issues or concerns (if any) 3. Suggestions for improvement Focus on being constructive and specific in your feedback. """ return TaskContext( task_id=f"peer_review_{result.task_id}_{validator_model_id}", task_type=TaskType.ANALYSIS, content=review_prompt, metadata={'validation_type': 'peer_review', 'validator': validator_model_id} ) async def _execute_validation_task( self, validator_model_id: str, validation_task: TaskContext ) -> ProcessingResult: """Execute a validation task with timeout.""" try: return await asyncio.wait_for( self.model_provider.process_task(validation_task, validator_model_id), timeout=self.config.timeout_seconds ) except asyncio.TimeoutError: raise Exception(f"Validation task timed out for model {validator_model_id}") def _parse_peer_review_result( self, validation_result: ProcessingResult, validator_model_id: str ) -> List[ValidationIssue]: """Parse peer review result to extract validation issues.""" issues = [] content = validation_result.content.lower() # Simple parsing logic (would be more sophisticated in practice) issue_indicators = { "error": ValidationSeverity.HIGH, "incorrect": ValidationSeverity.HIGH, "inaccurate": ValidationSeverity.MEDIUM, "unclear": ValidationSeverity.MEDIUM, "improve": ValidationSeverity.LOW, "suggest": ValidationSeverity.LOW } for indicator, severity in issue_indicators.items(): if indicator in content: # Extract context around the indicator start_idx = max(0, content.find(indicator) - 50) end_idx = min(len(content), content.find(indicator) + 100) context = validation_result.content[start_idx:end_idx] issues.append( ValidationIssue( issue_id=f"peer_review_{len(issues)}_{validator_model_id}", criteria=ValidationCriteria.ACCURACY, # Default, would be more specific severity=severity, description=f"Peer reviewer identified: {indicator}", suggestion="Review and address the identified concern", confidence=validation_result.confidence, evidence=context, validator_model_id=validator_model_id ) ) return issues async def _adversarial_validation( self, result: ProcessingResult, task_context: TaskContext, validator_models: List[str] ) -> ValidationReport: """Perform adversarial validation where models challenge the result.""" adversarial_prompt = f""" Act as a critical reviewer and challenge the following response: Original Task: {task_context.content} Response to Challenge: {result.content} Your goal is to find flaws, inconsistencies, or weaknesses in the response. Be thorough and skeptical, but fair in your criticism. Identify specific issues and provide evidence for your challenges. """ all_issues = [] for validator_model_id in validator_models: adversarial_task = TaskContext( task_id=f"adversarial_{result.task_id}_{validator_model_id}", task_type=TaskType.ANALYSIS, content=adversarial_prompt ) try: validation_result = await self._execute_validation_task( validator_model_id, adversarial_task ) issues = self._parse_adversarial_result(validation_result, validator_model_id) all_issues.extend(issues) except Exception as e: logger.warning(f"Adversarial validation failed for {validator_model_id}: {str(e)}") # Create validation report criteria_scores = {} for criteria in self.config.criteria: criteria_issues = [issue for issue in all_issues if issue.criteria == criteria] criteria_scores[criteria] = self._calculate_criteria_score(criteria_issues) overall_score = self._calculate_overall_score(criteria_scores) consensus_level = self._calculate_consensus_level(all_issues, validator_models) return ValidationReport( original_result=result, task_context=task_context, validation_strategy=self.config.strategy, validator_models=validator_models, issues=all_issues, overall_score=overall_score, criteria_scores=criteria_scores, consensus_level=consensus_level, recommendations=self._generate_recommendations(all_issues) ) def _parse_adversarial_result( self, validation_result: ProcessingResult, validator_model_id: str ) -> List[ValidationIssue]: """Parse adversarial validation result.""" issues = [] content = validation_result.content.lower() # Look for adversarial challenge indicators challenge_indicators = ["flaw", "inconsist", "weak", "question", "challenge", "doubt"] for indicator in challenge_indicators: if indicator in content: issues.append( ValidationIssue( issue_id=f"adversarial_{len(issues)}_{validator_model_id}", criteria=ValidationCriteria.LOGICAL_SOUNDNESS, severity=ValidationSeverity.MEDIUM, description=f"Adversarial challenge: {indicator} identified", suggestion="Address the adversarial challenge raised", confidence=validation_result.confidence, evidence=validation_result.content[:300] + "...", validator_model_id=validator_model_id ) ) return issues async def _consensus_validation( self, result: ProcessingResult, task_context: TaskContext, validator_models: List[str] ) -> ValidationReport: """Validate by checking consensus among multiple responses.""" # Generate alternative responses from validators alternative_responses = [] for validator_model_id in validator_models: try: alternative_result = await self.model_provider.process_task( task_context, validator_model_id ) alternative_responses.append((validator_model_id, alternative_result)) except Exception as e: logger.warning(f"Failed to generate alternative response from {validator_model_id}: {str(e)}") # Compare original result with alternatives issues = self._compare_for_consensus(result, alternative_responses) # Calculate consensus metrics consensus_level = len(alternative_responses) / max(len(validator_models), 1) criteria_scores = {criteria: 0.8 for criteria in self.config.criteria} # Default scores overall_score = consensus_level return ValidationReport( original_result=result, task_context=task_context, validation_strategy=self.config.strategy, validator_models=validator_models, issues=issues, overall_score=overall_score, criteria_scores=criteria_scores, consensus_level=consensus_level, recommendations=self._generate_recommendations(issues) ) def _compare_for_consensus( self, original_result: ProcessingResult, alternative_responses: List[Tuple[str, ProcessingResult]] ) -> List[ValidationIssue]: """Compare original result with alternatives to find consensus issues.""" issues = [] if len(alternative_responses) < 2: issues.append( ValidationIssue( issue_id="consensus_insufficient", criteria=ValidationCriteria.CONSISTENCY, severity=ValidationSeverity.MEDIUM, description="Insufficient alternative responses for consensus validation", suggestion="Increase the number of validator models", confidence=1.0, evidence="", validator_model_id="system" ) ) return issues # Simple consensus check based on response length similarity original_length = len(original_result.content) alternative_lengths = [len(resp[1].content) for resp in alternative_responses] avg_alternative_length = statistics.mean(alternative_lengths) length_diff_ratio = abs(original_length - avg_alternative_length) / max(avg_alternative_length, 1) if length_diff_ratio > 0.5: # Significant length difference issues.append( ValidationIssue( issue_id="consensus_length_outlier", criteria=ValidationCriteria.COMPLETENESS, severity=ValidationSeverity.MEDIUM, description="Response length significantly differs from consensus", suggestion="Review completeness compared to alternative responses", confidence=0.7, evidence=f"Original: {original_length} chars, Average alternative: {avg_alternative_length:.0f} chars", validator_model_id="consensus_checker" ) ) return issues async def _fact_check_validation( self, result: ProcessingResult, task_context: TaskContext, validator_models: List[str] ) -> ValidationReport: """Perform specialized fact-checking validation.""" if ValidationCriteria.FACTUAL_CORRECTNESS in self.specialized_validators: fact_checker = self.specialized_validators[ValidationCriteria.FACTUAL_CORRECTNESS] all_issues = [] for validator_model_id in validator_models: try: issues = await fact_checker.validate(result, task_context, validator_model_id) all_issues.extend(issues) except Exception as e: logger.warning(f"Fact-checking failed with {validator_model_id}: {str(e)}") criteria_scores = {ValidationCriteria.FACTUAL_CORRECTNESS: self._calculate_criteria_score(all_issues)} overall_score = criteria_scores[ValidationCriteria.FACTUAL_CORRECTNESS] return ValidationReport( original_result=result, task_context=task_context, validation_strategy=self.config.strategy, validator_models=validator_models, issues=all_issues, overall_score=overall_score, criteria_scores=criteria_scores, consensus_level=1.0, # Single criteria validation recommendations=self._generate_recommendations(all_issues) ) else: # Fallback to peer review return await self._peer_review_validation(result, task_context, validator_models) async def _quality_assurance_validation( self, result: ProcessingResult, task_context: TaskContext, validator_models: List[str] ) -> ValidationReport: """Perform comprehensive quality assurance validation.""" # Similar to peer review but with more focus on quality metrics return await self._peer_review_validation(result, task_context, validator_models) async def _bias_detection_validation( self, result: ProcessingResult, task_context: TaskContext, validator_models: List[str] ) -> ValidationReport: """Perform bias detection validation.""" if ValidationCriteria.BIAS_NEUTRALITY in self.specialized_validators: bias_detector = self.specialized_validators[ValidationCriteria.BIAS_NEUTRALITY] all_issues = [] for validator_model_id in validator_models: try: issues = await bias_detector.validate(result, task_context, validator_model_id) all_issues.extend(issues) except Exception as e: logger.warning(f"Bias detection failed with {validator_model_id}: {str(e)}") criteria_scores = {ValidationCriteria.BIAS_NEUTRALITY: self._calculate_criteria_score(all_issues)} overall_score = criteria_scores[ValidationCriteria.BIAS_NEUTRALITY] return ValidationReport( original_result=result, task_context=task_context, validation_strategy=self.config.strategy, validator_models=validator_models, issues=all_issues, overall_score=overall_score, criteria_scores=criteria_scores, consensus_level=1.0, recommendations=self._generate_recommendations(all_issues) ) else: return await self._peer_review_validation(result, task_context, validator_models) def _calculate_criteria_score(self, criteria_issues: List[ValidationIssue]) -> float: """Calculate score for a specific criteria based on issues found.""" if not criteria_issues: return 1.0 # Perfect score if no issues # Weight issues by severity severity_weights = { ValidationSeverity.CRITICAL: -0.5, ValidationSeverity.HIGH: -0.3, ValidationSeverity.MEDIUM: -0.2, ValidationSeverity.LOW: -0.1, ValidationSeverity.INFO: 0.0 } total_deduction = sum(severity_weights[issue.severity] for issue in criteria_issues) score = max(0.0, 1.0 + total_deduction) return score def _calculate_overall_score(self, criteria_scores: Dict[ValidationCriteria, float]) -> float: """Calculate overall validation score from criteria scores.""" if not criteria_scores: return 0.0 return statistics.mean(criteria_scores.values()) def _calculate_consensus_level( self, all_issues: List[ValidationIssue], validator_models: List[str] ) -> float: """Calculate level of consensus among validators.""" if not validator_models: return 0.0 # Group issues by validator validator_issue_counts = {} for validator_id in validator_models: validator_issue_counts[validator_id] = len([ issue for issue in all_issues if issue.validator_model_id == validator_id ]) if not validator_issue_counts: return 1.0 # Calculate consensus based on agreement in issue detection issue_counts = list(validator_issue_counts.values()) if not issue_counts: return 1.0 avg_issues = statistics.mean(issue_counts) max_issues = max(issue_counts) if max_issues == 0: return 1.0 # All validators agree (no issues) consensus = 1.0 - (statistics.stdev(issue_counts) / max(avg_issues, 1)) return max(0.0, min(1.0, consensus)) def _calculate_validation_confidence(self, validation_report: ValidationReport) -> float: """Calculate confidence in the validation result.""" base_confidence = validation_report.overall_score # Adjust based on consensus level consensus_factor = validation_report.consensus_level # Adjust based on number of validators validator_factor = min(1.0, len(validation_report.validator_models) / self.config.min_validators) confidence = base_confidence * consensus_factor * validator_factor return min(1.0, confidence) def _determine_validity(self, validation_report: ValidationReport) -> bool: """Determine if the result is valid based on validation report.""" # Check for critical issues critical_issues = [ issue for issue in validation_report.issues if issue.severity == ValidationSeverity.CRITICAL ] if critical_issues: return False # Check overall score against threshold if validation_report.overall_score < self.config.confidence_threshold: return False # Check consensus requirement if self.config.require_consensus: if validation_report.consensus_level < self.config.consensus_threshold: return False return True def _generate_improvement_suggestions(self, validation_report: ValidationReport) -> List[str]: """Generate improvement suggestions based on validation issues.""" suggestions = [] # Group issues by severity critical_issues = [i for i in validation_report.issues if i.severity == ValidationSeverity.CRITICAL] high_issues = [i for i in validation_report.issues if i.severity == ValidationSeverity.HIGH] if critical_issues: suggestions.append("Address critical issues immediately before using this result") for issue in critical_issues[:3]: # Top 3 critical issues suggestions.append(f"Critical: {issue.suggestion}") if high_issues: suggestions.append("Review and fix high-priority issues") for issue in high_issues[:3]: # Top 3 high issues suggestions.append(f"High: {issue.suggestion}") if validation_report.overall_score < 0.8: suggestions.append("Consider regenerating the response with different parameters") if validation_report.consensus_level < 0.7: suggestions.append("Seek additional validation due to low consensus among validators") return suggestions def _generate_recommendations(self, issues: List[ValidationIssue]) -> List[str]: """Generate general recommendations from validation issues.""" recommendations = [] # Group by criteria criteria_issues = {} for issue in issues: if issue.criteria not in criteria_issues: criteria_issues[issue.criteria] = [] criteria_issues[issue.criteria].append(issue) for criteria, criteria_issue_list in criteria_issues.items(): if len(criteria_issue_list) > 0: recommendations.append( f"Focus on improving {criteria.value} (found {len(criteria_issue_list)} issues)" ) return recommendations def _calculate_validation_quality_metrics(self, validation_report: ValidationReport) -> QualityMetrics: """Calculate quality metrics based on validation results.""" accuracy = validation_report.criteria_scores.get(ValidationCriteria.ACCURACY, 0.5) consistency = validation_report.criteria_scores.get(ValidationCriteria.CONSISTENCY, 0.5) completeness = validation_report.criteria_scores.get(ValidationCriteria.COMPLETENESS, 0.5) relevance = validation_report.criteria_scores.get(ValidationCriteria.RELEVANCE, 0.5) confidence = validation_report.overall_score coherence = validation_report.criteria_scores.get(ValidationCriteria.COHERENCE, 0.5) return QualityMetrics( accuracy=accuracy, consistency=consistency, completeness=completeness, relevance=relevance, confidence=confidence, coherence=coherence ) def _update_validator_performance(self, validation_report: ValidationReport) -> None: """Update performance tracking for validator models.""" for validator_id in validation_report.validator_models: if validator_id not in self.validator_performance: self.validator_performance[validator_id] = { 'validation_count': 0, 'accuracy': 0.5, 'consistency': 0.5 } perf = self.validator_performance[validator_id] perf['validation_count'] += 1 # Update accuracy based on validation quality new_accuracy = validation_report.overall_score count = perf['validation_count'] perf['accuracy'] = (perf['accuracy'] * (count - 1) + new_accuracy) / count def get_validation_history(self, limit: Optional[int] = None) -> List[ValidationResult]: """Get historical validation results.""" if limit: return self.validation_history[-limit:] return self.validation_history.copy() def get_validator_performance(self) -> Dict[str, Dict[str, float]]: """Get performance statistics for validator models.""" return self.validator_performance.copy() def configure_validation(self, **config_updates) -> None: """Update validation configuration.""" for key, value in config_updates.items(): if hasattr(self.config, key): setattr(self.config, key, value) logger.info(f"Updated validation configuration: {config_updates}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/physics91/openrouter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server