Skip to main content
Glama
consensus_engine.py17.7 kB
""" Multi-Model Consensus Engine This module implements a consensus mechanism that aggregates responses from multiple AI models to produce more reliable and accurate results through collective decision-making. """ import asyncio from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, List, Optional, Set from datetime import datetime import statistics import logging from .base import ( CollectiveIntelligenceComponent, TaskContext, ProcessingResult, ModelProvider, QualityMetrics ) logger = logging.getLogger(__name__) class ConsensusStrategy(Enum): """Strategies for building consensus among models.""" MAJORITY_VOTE = "majority_vote" WEIGHTED_AVERAGE = "weighted_average" CONFIDENCE_THRESHOLD = "confidence_threshold" SEMANTIC_SIMILARITY = "semantic_similarity" EXPERT_SELECTION = "expert_selection" class AgreementLevel(Enum): """Levels of agreement among models.""" UNANIMOUS = "unanimous" # 100% agreement HIGH_CONSENSUS = "high_consensus" # 80%+ agreement MODERATE_CONSENSUS = "moderate_consensus" # 60%+ agreement LOW_CONSENSUS = "low_consensus" # 40%+ agreement NO_CONSENSUS = "no_consensus" # <40% agreement @dataclass class ConsensusConfig: """Configuration for consensus building.""" strategy: ConsensusStrategy = ConsensusStrategy.MAJORITY_VOTE min_models: int = 3 max_models: int = 5 confidence_threshold: float = 0.7 agreement_threshold: float = 0.6 timeout_seconds: float = 30.0 retry_attempts: int = 2 model_weights: Dict[str, float] = field(default_factory=dict) exclude_models: Set[str] = field(default_factory=set) @dataclass class ModelResponse: """Response from a single model in consensus building.""" model_id: str result: ProcessingResult weight: float = 1.0 reliability_score: float = 1.0 @dataclass class ConsensusResult: """Result of consensus building process.""" task_id: str consensus_content: str agreement_level: AgreementLevel confidence_score: float participating_models: List[str] model_responses: List[ModelResponse] strategy_used: ConsensusStrategy processing_time: float quality_metrics: QualityMetrics metadata: Dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.now) class ConsensusEngine(CollectiveIntelligenceComponent): """ Multi-model consensus engine that aggregates responses from multiple AI models to produce more reliable and accurate results. """ def __init__(self, model_provider: ModelProvider, config: Optional[ConsensusConfig] = None): super().__init__(model_provider) self.config = config or ConsensusConfig() self.consensus_history: List[ConsensusResult] = [] self.model_reliability: Dict[str, float] = {} async def process(self, task: TaskContext, **kwargs) -> ConsensusResult: """ Build consensus among multiple models for the given task. Args: task: The task to process **kwargs: Additional configuration options Returns: ConsensusResult containing the consensus and metadata """ start_time = datetime.now() try: # Select models for consensus models = await self._select_models(task) logger.info(f"Selected {len(models)} models for consensus: {[m.model_id for m in models]}") # Get responses from all models model_responses = await self._get_model_responses(task, models) # Build consensus consensus_result = await self._build_consensus(task, model_responses) # Update metrics and history processing_time = (datetime.now() - start_time).total_seconds() consensus_result.processing_time = processing_time self._update_model_reliability(model_responses, consensus_result) self.consensus_history.append(consensus_result) logger.info(f"Consensus completed: {consensus_result.agreement_level.value}, " f"confidence: {consensus_result.confidence_score:.3f}") return consensus_result except Exception as e: logger.error(f"Consensus building failed: {str(e)}") raise async def _select_models(self, task: TaskContext) -> List[str]: """Select appropriate models for consensus building.""" available_models = await self.model_provider.get_available_models() # Filter excluded models eligible_models = [ model for model in available_models if model.model_id not in self.config.exclude_models ] # Sort by relevance to task type and reliability scored_models = [] for model in eligible_models: relevance_score = self._calculate_model_relevance(model, task) reliability_score = self.model_reliability.get(model.model_id, 1.0) total_score = relevance_score * reliability_score scored_models.append((model.model_id, total_score)) # Select top models within configured limits scored_models.sort(key=lambda x: x[1], reverse=True) selected_count = min( max(self.config.min_models, len(scored_models)), self.config.max_models ) return [model_id for model_id, _ in scored_models[:selected_count]] def _calculate_model_relevance(self, model, task: TaskContext) -> float: """Calculate how relevant a model is for the given task.""" # This is a simplified relevance calculation # In practice, this would use more sophisticated matching base_score = 0.5 # Boost score based on task type and model capabilities if hasattr(model, 'capabilities'): task_type_mapping = { 'reasoning': 'reasoning', 'creative': 'creativity', 'factual': 'accuracy', 'code_generation': 'code' } relevant_capability = task_type_mapping.get(task.task_type.value) if relevant_capability and relevant_capability in model.capabilities: base_score += model.capabilities[relevant_capability] * 0.5 return min(1.0, base_score) async def _get_model_responses( self, task: TaskContext, model_ids: List[str] ) -> List[ModelResponse]: """Get responses from all selected models concurrently.""" async def get_single_response(model_id: str) -> Optional[ModelResponse]: try: result = await asyncio.wait_for( self.model_provider.process_task(task, model_id), timeout=self.config.timeout_seconds ) weight = self.config.model_weights.get(model_id, 1.0) reliability = self.model_reliability.get(model_id, 1.0) return ModelResponse( model_id=model_id, result=result, weight=weight, reliability_score=reliability ) except Exception as e: logger.warning(f"Model {model_id} failed to respond: {str(e)}") return None # Execute all model calls concurrently tasks = [get_single_response(model_id) for model_id in model_ids] responses = await asyncio.gather(*tasks, return_exceptions=True) # Filter out failed responses valid_responses = [ response for response in responses if isinstance(response, ModelResponse) ] if len(valid_responses) < self.config.min_models: raise ValueError( f"Insufficient responses for consensus: got {len(valid_responses)}, " f"need at least {self.config.min_models}" ) return valid_responses async def _build_consensus( self, task: TaskContext, responses: List[ModelResponse] ) -> ConsensusResult: """Build consensus from model responses using the configured strategy.""" if self.config.strategy == ConsensusStrategy.MAJORITY_VOTE: return self._majority_vote_consensus(task, responses) elif self.config.strategy == ConsensusStrategy.WEIGHTED_AVERAGE: return self._weighted_average_consensus(task, responses) elif self.config.strategy == ConsensusStrategy.CONFIDENCE_THRESHOLD: return self._confidence_threshold_consensus(task, responses) else: # Default to majority vote return self._majority_vote_consensus(task, responses) def _majority_vote_consensus( self, task: TaskContext, responses: List[ModelResponse] ) -> ConsensusResult: """Build consensus using majority vote strategy.""" # Group similar responses (simplified approach) response_groups = self._group_similar_responses(responses) # Find the group with highest total weight best_group = max(response_groups, key=lambda g: sum(r.weight for r in g)) # Calculate agreement level total_weight = sum(r.weight for r in responses) consensus_weight = sum(r.weight for r in best_group) agreement_ratio = consensus_weight / total_weight agreement_level = self._calculate_agreement_level(agreement_ratio) # Select best response from winning group best_response = max(best_group, key=lambda r: r.result.confidence * r.reliability_score) # Calculate consensus confidence consensus_confidence = self._calculate_consensus_confidence(best_group, responses) # Calculate quality metrics quality_metrics = self._calculate_quality_metrics(best_group, responses) return ConsensusResult( task_id=task.task_id, consensus_content=best_response.result.content, agreement_level=agreement_level, confidence_score=consensus_confidence, participating_models=[r.model_id for r in responses], model_responses=responses, strategy_used=self.config.strategy, processing_time=0.0, # Will be set by caller quality_metrics=quality_metrics ) def _weighted_average_consensus( self, task: TaskContext, responses: List[ModelResponse] ) -> ConsensusResult: """Build consensus using weighted average strategy.""" # This is a simplified implementation # In practice, this would need semantic averaging total_weight = sum(r.weight * r.reliability_score for r in responses) if total_weight == 0: raise ValueError("Total weight is zero, cannot compute weighted average") # For now, select the response with highest weighted confidence best_response = max( responses, key=lambda r: r.result.confidence * r.weight * r.reliability_score ) # Calculate average confidence avg_confidence = sum( r.result.confidence * r.weight * r.reliability_score for r in responses ) / total_weight quality_metrics = self._calculate_quality_metrics(responses, responses) return ConsensusResult( task_id=task.task_id, consensus_content=best_response.result.content, agreement_level=AgreementLevel.MODERATE_CONSENSUS, confidence_score=avg_confidence, participating_models=[r.model_id for r in responses], model_responses=responses, strategy_used=self.config.strategy, processing_time=0.0, quality_metrics=quality_metrics ) def _confidence_threshold_consensus( self, task: TaskContext, responses: List[ModelResponse] ) -> ConsensusResult: """Build consensus using confidence threshold strategy.""" # Filter responses by confidence threshold high_confidence_responses = [ r for r in responses if r.result.confidence >= self.config.confidence_threshold ] if not high_confidence_responses: # Fall back to best available response high_confidence_responses = [max(responses, key=lambda r: r.result.confidence)] # Use majority vote among high-confidence responses return self._majority_vote_consensus(task, high_confidence_responses) def _group_similar_responses(self, responses: List[ModelResponse]) -> List[List[ModelResponse]]: """Group similar responses together (simplified implementation).""" # This is a very simplified grouping based on response length similarity # In practice, this would use semantic similarity groups = [] for response in responses: added_to_group = False for group in groups: # Simple similarity check based on content length if any(abs(len(response.result.content) - len(r.result.content)) < 50 for r in group): group.append(response) added_to_group = True break if not added_to_group: groups.append([response]) return groups def _calculate_agreement_level(self, agreement_ratio: float) -> AgreementLevel: """Calculate agreement level based on consensus ratio.""" if agreement_ratio >= 1.0: return AgreementLevel.UNANIMOUS elif agreement_ratio >= 0.8: return AgreementLevel.HIGH_CONSENSUS elif agreement_ratio >= 0.6: return AgreementLevel.MODERATE_CONSENSUS elif agreement_ratio >= 0.4: return AgreementLevel.LOW_CONSENSUS else: return AgreementLevel.NO_CONSENSUS def _calculate_consensus_confidence( self, consensus_group: List[ModelResponse], all_responses: List[ModelResponse] ) -> float: """Calculate confidence score for the consensus.""" if not consensus_group: return 0.0 # Average confidence of consensus group group_confidence = statistics.mean(r.result.confidence for r in consensus_group) # Weight by group size relative to total size_weight = len(consensus_group) / len(all_responses) # Weight by reliability scores reliability_weight = statistics.mean(r.reliability_score for r in consensus_group) return group_confidence * size_weight * reliability_weight def _calculate_quality_metrics( self, consensus_group: List[ModelResponse], all_responses: List[ModelResponse] ) -> QualityMetrics: """Calculate quality metrics for the consensus.""" if not consensus_group: return QualityMetrics() # Calculate metrics based on response characteristics confidences = [r.result.confidence for r in consensus_group] accuracy = statistics.mean(confidences) consistency = 1.0 - (statistics.stdev(confidences) if len(confidences) > 1 else 0.0) completeness = min(1.0, len(consensus_group) / len(all_responses)) relevance = accuracy # Simplified confidence = statistics.mean(confidences) coherence = consistency # Simplified return QualityMetrics( accuracy=accuracy, consistency=consistency, completeness=completeness, relevance=relevance, confidence=confidence, coherence=coherence ) def _update_model_reliability( self, responses: List[ModelResponse], consensus: ConsensusResult ) -> None: """Update model reliability scores based on consensus participation.""" for response in responses: current_reliability = self.model_reliability.get(response.model_id, 1.0) # Models that contributed to consensus get reliability boost if response.model_id in consensus.participating_models: # Small positive adjustment new_reliability = min(1.0, current_reliability + 0.01) else: # Small negative adjustment for non-contributing models new_reliability = max(0.1, current_reliability - 0.005) self.model_reliability[response.model_id] = new_reliability def get_consensus_history(self, limit: Optional[int] = None) -> List[ConsensusResult]: """Get historical consensus results.""" if limit: return self.consensus_history[-limit:] return self.consensus_history.copy() def get_model_reliability_scores(self) -> Dict[str, float]: """Get current model reliability scores.""" return self.model_reliability.copy()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/physics91/openrouter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server