Skip to main content
Glama
test_ensemble_reasoning.py35.5 kB
""" Comprehensive test suite for the Ensemble Reasoning module. This module provides thorough testing of the intelligent task decomposition and specialized model routing functionality. """ import asyncio import pytest from datetime import datetime, timedelta from typing import List from unittest.mock import AsyncMock, Mock from src.openrouter_mcp.collective_intelligence.ensemble_reasoning import ( EnsembleReasoner, TaskDecomposer, ModelAssigner, DecompositionStrategy, TaskPriority, SubTask, ModelAssignment, EnsembleTask, EnsembleResult, SubTaskResult ) from src.openrouter_mcp.collective_intelligence.base import ( TaskContext, ProcessingResult, ModelInfo, TaskType, ModelCapability ) class TestTaskDecomposer: """Test suite for the TaskDecomposer class.""" @pytest.mark.unit def test_task_decomposer_initialization(self): """Test that TaskDecomposer initializes correctly.""" decomposer = TaskDecomposer() assert hasattr(decomposer, 'decomposition_rules') assert isinstance(decomposer.decomposition_rules, dict) assert TaskType.REASONING in decomposer.decomposition_rules assert TaskType.CREATIVE in decomposer.decomposition_rules @pytest.mark.unit def test_select_decomposition_strategy_reasoning(self): """Test strategy selection for reasoning tasks.""" decomposer = TaskDecomposer() task = TaskContext( task_id="test_reasoning", task_type=TaskType.REASONING, content="Analyze the pros and cons of renewable energy" ) strategy = decomposer._select_decomposition_strategy(task) assert strategy == DecompositionStrategy.SEQUENTIAL @pytest.mark.unit def test_select_decomposition_strategy_creative(self): """Test strategy selection for creative tasks.""" decomposer = TaskDecomposer() task = TaskContext( task_id="test_creative", task_type=TaskType.CREATIVE, content="Write a creative story about AI" ) strategy = decomposer._select_decomposition_strategy(task) assert strategy == DecompositionStrategy.HIERARCHICAL @pytest.mark.unit def test_select_decomposition_strategy_complex_urgent(self): """Test strategy selection for complex urgent tasks.""" decomposer = TaskDecomposer() task = TaskContext( task_id="test_urgent", task_type=TaskType.ANALYSIS, content="A" * 1500, # Long content deadline=datetime.now() + timedelta(minutes=30) # Urgent deadline ) strategy = decomposer._select_decomposition_strategy(task) assert strategy == DecompositionStrategy.PARALLEL @pytest.mark.asyncio @pytest.mark.unit async def test_decompose_sequential_reasoning(self): """Test sequential decomposition for reasoning tasks.""" decomposer = TaskDecomposer() task = TaskContext( task_id="test_reasoning", task_type=TaskType.REASONING, content="Analyze the impact of climate change on agriculture" ) sub_tasks = await decomposer._decompose_sequential(task) assert isinstance(sub_tasks, list) assert len(sub_tasks) >= 3 assert all(isinstance(st, SubTask) for st in sub_tasks) assert all(st.parent_task_id == task.task_id for st in sub_tasks) # Check dependency chain for i, sub_task in enumerate(sub_tasks[1:], 1): assert f"{task.task_id}_seq_{i}" in sub_task.dependencies @pytest.mark.asyncio @pytest.mark.unit async def test_decompose_parallel_analysis(self): """Test parallel decomposition for analysis tasks.""" decomposer = TaskDecomposer() task = TaskContext( task_id="test_analysis", task_type=TaskType.ANALYSIS, content="Analyze market trends in renewable energy sector" ) sub_tasks = await decomposer._decompose_parallel(task) assert isinstance(sub_tasks, list) assert len(sub_tasks) >= 3 assert all(isinstance(st, SubTask) for st in sub_tasks) # Parallel tasks should have no dependencies assert all(len(st.dependencies) == 0 for st in sub_tasks) @pytest.mark.asyncio @pytest.mark.unit async def test_decompose_hierarchical_creative(self): """Test hierarchical decomposition for creative tasks.""" decomposer = TaskDecomposer() task = TaskContext( task_id="test_creative", task_type=TaskType.CREATIVE, content="Create a comprehensive marketing campaign for a new product" ) sub_tasks = await decomposer._decompose_hierarchical(task) assert isinstance(sub_tasks, list) assert len(sub_tasks) >= 6 # Main categories + sub-categories # Check that there are both main and sub-category tasks main_tasks = [st for st in sub_tasks if len(st.dependencies) == 0] sub_category_tasks = [st for st in sub_tasks if len(st.dependencies) > 0] assert len(main_tasks) >= 3 assert len(sub_category_tasks) >= 3 @pytest.mark.asyncio @pytest.mark.integration async def test_decompose_task_full_process(self): """Test the complete task decomposition process.""" decomposer = TaskDecomposer() task = TaskContext( task_id="test_full_decomposition", task_type=TaskType.CODE_GENERATION, content="Build a REST API for user management with authentication" ) ensemble_task = await decomposer.decompose_task(task) assert isinstance(ensemble_task, EnsembleTask) assert ensemble_task.task_id == task.task_id assert ensemble_task.original_task == task assert isinstance(ensemble_task.decomposition_strategy, DecompositionStrategy) assert len(ensemble_task.sub_tasks) >= 3 assert all(isinstance(st, SubTask) for st in ensemble_task.sub_tasks) @pytest.mark.unit def test_get_phase_capabilities(self): """Test capability assignment for different phases.""" decomposer = TaskDecomposer() # Test analysis phase analysis_caps = decomposer._get_phase_capabilities("analyze the problem", TaskType.REASONING) assert ModelCapability.REASONING in analysis_caps assert ModelCapability.ACCURACY in analysis_caps # Test implementation phase for code impl_caps = decomposer._get_phase_capabilities("implement code", TaskType.CODE_GENERATION) assert ModelCapability.CODE in impl_caps # Test validation phase val_caps = decomposer._get_phase_capabilities("test and validate", TaskType.CODE_GENERATION) assert ModelCapability.ACCURACY in val_caps assert ModelCapability.REASONING in val_caps class TestModelAssigner: """Test suite for the ModelAssigner class.""" @pytest.mark.unit def test_model_assigner_initialization(self, mock_model_provider): """Test that ModelAssigner initializes correctly.""" assigner = ModelAssigner(mock_model_provider) assert assigner.model_provider == mock_model_provider assert isinstance(assigner.assignment_history, list) assert len(assigner.assignment_history) == 0 @pytest.mark.unit def test_calculate_model_score(self, mock_model_provider, sample_models): """Test model scoring algorithm.""" assigner = ModelAssigner(mock_model_provider) sub_task = SubTask( sub_task_id="test_subtask", parent_task_id="test_parent", content="Analyze data patterns", task_type=TaskType.ANALYSIS, required_capabilities=[ModelCapability.REASONING, ModelCapability.ACCURACY], priority=TaskPriority.HIGH ) # Test with a model that has matching capabilities model = sample_models[0] # GPT-4 with strong reasoning score = assigner._calculate_model_score(model, sub_task) assert 0.0 <= score <= 1.0 assert score > 0.5 # Should be reasonably high for matching capabilities @pytest.mark.unit def test_estimate_cost(self, mock_model_provider, sample_models): """Test cost estimation for model assignments.""" assigner = ModelAssigner(mock_model_provider) sub_task = SubTask( sub_task_id="test_subtask", parent_task_id="test_parent", content="This is a test task with some content to analyze", task_type=TaskType.ANALYSIS, required_capabilities=[ModelCapability.REASONING] ) model = sample_models[0] cost = assigner._estimate_cost(model, sub_task) assert cost >= 0.0 assert isinstance(cost, float) @pytest.mark.unit def test_estimate_time(self, mock_model_provider, sample_models): """Test time estimation for model assignments.""" assigner = ModelAssigner(mock_model_provider) sub_task = SubTask( sub_task_id="test_subtask", parent_task_id="test_parent", content="Short task", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING] ) model = sample_models[0] time = assigner._estimate_time(model, sub_task) assert time > 0.0 assert isinstance(time, float) @pytest.mark.asyncio @pytest.mark.unit async def test_assign_single_model(self, mock_model_provider, sample_models): """Test assignment of a single model to a sub-task.""" assigner = ModelAssigner(mock_model_provider) sub_task = SubTask( sub_task_id="test_subtask", parent_task_id="test_parent", content="Analyze the given data", task_type=TaskType.ANALYSIS, required_capabilities=[ModelCapability.REASONING, ModelCapability.ACCURACY], priority=TaskPriority.HIGH ) assignment = await assigner._assign_single_model(sub_task, sample_models) assert isinstance(assignment, ModelAssignment) assert assignment.sub_task_id == sub_task.sub_task_id assert assignment.model_id in [model.model_id for model in sample_models] assert 0.0 <= assignment.confidence_score <= 1.0 assert assignment.estimated_cost >= 0.0 assert assignment.estimated_time > 0.0 assert isinstance(assignment.justification, str) assert len(assignment.justification) > 0 @pytest.mark.asyncio @pytest.mark.integration async def test_assign_models_multiple_subtasks(self, mock_model_provider, sample_models): """Test assignment of models to multiple sub-tasks.""" assigner = ModelAssigner(mock_model_provider) # Create ensemble task with multiple sub-tasks ensemble_task = EnsembleTask( task_id="test_ensemble", original_task=TaskContext( task_id="test_original", task_type=TaskType.REASONING, content="Complex reasoning task" ), decomposition_strategy=DecompositionStrategy.PARALLEL, sub_tasks=[ SubTask( sub_task_id="subtask_1", parent_task_id="test_ensemble", content="Analyze aspect 1", task_type=TaskType.ANALYSIS, required_capabilities=[ModelCapability.REASONING] ), SubTask( sub_task_id="subtask_2", parent_task_id="test_ensemble", content="Generate creative solution", task_type=TaskType.CREATIVE, required_capabilities=[ModelCapability.CREATIVITY] ), SubTask( sub_task_id="subtask_3", parent_task_id="test_ensemble", content="Validate results", task_type=TaskType.ANALYSIS, required_capabilities=[ModelCapability.ACCURACY] ) ] ) assignments = await assigner.assign_models(ensemble_task) assert len(assignments) == 3 assert all(isinstance(assignment, ModelAssignment) for assignment in assignments) # Check that all sub-tasks have assignments assigned_subtask_ids = {assignment.sub_task_id for assignment in assignments} expected_subtask_ids = {st.sub_task_id for st in ensemble_task.sub_tasks} assert assigned_subtask_ids == expected_subtask_ids # Check that assignments are stored in history assert len(assigner.assignment_history) == 3 @pytest.mark.unit def test_generate_assignment_justification(self, mock_model_provider, sample_models): """Test generation of assignment justifications.""" assigner = ModelAssigner(mock_model_provider) model = sample_models[0] # GPT-4 sub_task = SubTask( sub_task_id="test_subtask", parent_task_id="test_parent", content="Reasoning task", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING] ) score = 0.85 justification = assigner._generate_assignment_justification(model, sub_task, score) assert isinstance(justification, str) assert len(justification) > 0 assert model.name in justification assert str(score) in justification assert sub_task.sub_task_id in justification @pytest.mark.asyncio @pytest.mark.edge_case async def test_assign_model_no_suitable_models(self, mock_model_provider): """Test handling when no suitable models are available.""" assigner = ModelAssigner(mock_model_provider) sub_task = SubTask( sub_task_id="test_subtask", parent_task_id="test_parent", content="Test task", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING] ) # Empty model list with pytest.raises(ValueError, match="No suitable model found"): await assigner._assign_single_model(sub_task, []) class TestEnsembleReasoner: """Test suite for the EnsembleReasoner class.""" @pytest.mark.unit def test_ensemble_reasoner_initialization(self, mock_model_provider): """Test that EnsembleReasoner initializes correctly.""" reasoner = EnsembleReasoner(mock_model_provider) assert reasoner.model_provider == mock_model_provider assert isinstance(reasoner.decomposer, TaskDecomposer) assert isinstance(reasoner.assigner, ModelAssigner) assert isinstance(reasoner.processing_history, list) assert len(reasoner.processing_history) == 0 @pytest.mark.asyncio @pytest.mark.integration async def test_full_ensemble_process(self, mock_model_provider, sample_task): """Test the complete ensemble reasoning process.""" reasoner = EnsembleReasoner(mock_model_provider) result = await reasoner.process(sample_task) assert isinstance(result, EnsembleResult) assert result.task_id == sample_task.task_id assert result.original_task == sample_task assert isinstance(result.final_content, str) assert len(result.final_content) > 0 assert len(result.sub_task_results) > 0 assert isinstance(result.decomposition_strategy, DecompositionStrategy) assert result.total_time > 0.0 assert 0.0 <= result.success_rate <= 1.0 assert result.total_cost >= 0.0 # Check that result is stored in history assert len(reasoner.processing_history) == 1 assert reasoner.processing_history[0] == result @pytest.mark.asyncio @pytest.mark.unit async def test_execute_sequential_strategy(self, mock_model_provider, sample_models): """Test sequential execution strategy.""" reasoner = EnsembleReasoner(mock_model_provider) # Create ensemble task with sequential sub-tasks ensemble_task = EnsembleTask( task_id="test_sequential", original_task=TaskContext( task_id="test_original", task_type=TaskType.REASONING, content="Sequential reasoning task" ), decomposition_strategy=DecompositionStrategy.SEQUENTIAL, sub_tasks=[ SubTask( sub_task_id="seq_1", parent_task_id="test_sequential", content="First step", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING], priority=TaskPriority.HIGH ), SubTask( sub_task_id="seq_2", parent_task_id="test_sequential", content="Second step", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING], dependencies=["seq_1"] ) ], assignments=[ ModelAssignment( sub_task_id="seq_1", model_id="openai/gpt-4", confidence_score=0.9, estimated_cost=0.01, estimated_time=2.0, justification="Test assignment" ), ModelAssignment( sub_task_id="seq_2", model_id="anthropic/claude-3-haiku", confidence_score=0.8, estimated_cost=0.005, estimated_time=1.5, justification="Test assignment" ) ] ) results = await reasoner._execute_sequential(ensemble_task) assert len(results) == 2 assert all(isinstance(result, SubTaskResult) for result in results) assert results[0].sub_task.sub_task_id == "seq_1" assert results[1].sub_task.sub_task_id == "seq_2" @pytest.mark.asyncio @pytest.mark.unit async def test_execute_parallel_strategy(self, mock_model_provider): """Test parallel execution strategy.""" reasoner = EnsembleReasoner(mock_model_provider) # Create ensemble task with parallel sub-tasks ensemble_task = EnsembleTask( task_id="test_parallel", original_task=TaskContext( task_id="test_original", task_type=TaskType.ANALYSIS, content="Parallel analysis task" ), decomposition_strategy=DecompositionStrategy.PARALLEL, sub_tasks=[ SubTask( sub_task_id="par_1", parent_task_id="test_parallel", content="Parallel task 1", task_type=TaskType.ANALYSIS, required_capabilities=[ModelCapability.REASONING] ), SubTask( sub_task_id="par_2", parent_task_id="test_parallel", content="Parallel task 2", task_type=TaskType.ANALYSIS, required_capabilities=[ModelCapability.ACCURACY] ) ], assignments=[ ModelAssignment( sub_task_id="par_1", model_id="openai/gpt-4", confidence_score=0.9, estimated_cost=0.01, estimated_time=2.0, justification="Test assignment" ), ModelAssignment( sub_task_id="par_2", model_id="anthropic/claude-3-haiku", confidence_score=0.8, estimated_cost=0.005, estimated_time=1.5, justification="Test assignment" ) ] ) results = await reasoner._execute_parallel(ensemble_task) assert len(results) == 2 assert all(isinstance(result, SubTaskResult) for result in results) # Check that both tasks were executed (order may vary) executed_ids = {result.sub_task.sub_task_id for result in results} assert executed_ids == {"par_1", "par_2"} @pytest.mark.asyncio @pytest.mark.unit async def test_execute_single_sub_task_success(self, mock_model_provider): """Test successful execution of a single sub-task.""" reasoner = EnsembleReasoner(mock_model_provider) sub_task = SubTask( sub_task_id="test_subtask", parent_task_id="test_parent", content="Test sub-task content", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING], timeout_seconds=5.0, max_retries=1 ) assignment = ModelAssignment( sub_task_id="test_subtask", model_id="openai/gpt-4", confidence_score=0.9, estimated_cost=0.01, estimated_time=2.0, justification="Test assignment" ) result = await reasoner._execute_single_sub_task(sub_task, assignment) assert isinstance(result, SubTaskResult) assert result.sub_task == sub_task assert result.assignment == assignment assert isinstance(result.result, ProcessingResult) assert result.success is True assert result.retry_count == 0 assert result.error_message is None @pytest.mark.asyncio @pytest.mark.unit async def test_execute_single_sub_task_with_retries(self, mock_model_provider): """Test sub-task execution with retry mechanism.""" # Create a provider that fails first time, succeeds second time failing_provider = AsyncMock() call_count = 0 async def mock_process_task(task, model_id, **kwargs): nonlocal call_count call_count += 1 if call_count == 1: raise Exception("Temporary failure") else: return ProcessingResult( task_id=task.task_id, model_id=model_id, content="Success after retry", confidence=0.8 ) failing_provider.process_task.side_effect = mock_process_task reasoner = EnsembleReasoner(failing_provider) sub_task = SubTask( sub_task_id="test_subtask", parent_task_id="test_parent", content="Test content", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING], timeout_seconds=5.0, max_retries=2 ) assignment = ModelAssignment( sub_task_id="test_subtask", model_id="test_model", confidence_score=0.9, estimated_cost=0.01, estimated_time=2.0, justification="Test" ) result = await reasoner._execute_single_sub_task(sub_task, assignment) assert result.success is True assert result.retry_count == 1 # One retry assert call_count == 2 # Called twice @pytest.mark.unit def test_find_assignment(self, mock_model_provider): """Test finding assignment for a specific sub-task.""" reasoner = EnsembleReasoner(mock_model_provider) assignments = [ ModelAssignment( sub_task_id="task_1", model_id="model_1", confidence_score=0.9, estimated_cost=0.01, estimated_time=2.0, justification="Test" ), ModelAssignment( sub_task_id="task_2", model_id="model_2", confidence_score=0.8, estimated_cost=0.005, estimated_time=1.5, justification="Test" ) ] # Test finding existing assignment assignment = reasoner._find_assignment("task_1", assignments) assert assignment.sub_task_id == "task_1" assert assignment.model_id == "model_1" # Test non-existent assignment with pytest.raises(ValueError, match="No assignment found"): reasoner._find_assignment("task_3", assignments) @pytest.mark.unit def test_synthesize_final_content(self, mock_model_provider, sample_processing_results): """Test synthesis of final content from sub-task results.""" reasoner = EnsembleReasoner(mock_model_provider) # Create sub-task results with different priorities sub_task_results = [] priorities = [TaskPriority.CRITICAL, TaskPriority.HIGH, TaskPriority.MEDIUM] for i, (result, priority) in enumerate(zip(sample_processing_results[:3], priorities)): sub_task = SubTask( sub_task_id=f"subtask_{i}", parent_task_id="test_parent", content=f"Sub-task {i}", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING], priority=priority ) assignment = ModelAssignment( sub_task_id=f"subtask_{i}", model_id=result.model_id, confidence_score=0.8, estimated_cost=0.01, estimated_time=2.0, justification="Test" ) sub_task_results.append( SubTaskResult( sub_task=sub_task, assignment=assignment, result=result, success=True ) ) ensemble_task = EnsembleTask( task_id="test_ensemble", original_task=TaskContext( task_id="test_original", task_type=TaskType.REASONING, content="Test task" ), decomposition_strategy=DecompositionStrategy.SEQUENTIAL ) final_content = reasoner._synthesize_final_content(sub_task_results, ensemble_task) assert isinstance(final_content, str) assert len(final_content) > 0 # Should contain content from all successful results for result in sub_task_results: assert any(part in final_content for part in result.result.content.split()[:3]) @pytest.mark.unit def test_calculate_overall_quality(self, mock_model_provider, sample_processing_results): """Test calculation of overall quality metrics.""" reasoner = EnsembleReasoner(mock_model_provider) # Create successful sub-task results successful_results = [] for i, result in enumerate(sample_processing_results[:3]): sub_task = SubTask( sub_task_id=f"subtask_{i}", parent_task_id="test_parent", content=f"Sub-task {i}", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING] ) assignment = ModelAssignment( sub_task_id=f"subtask_{i}", model_id=result.model_id, confidence_score=0.8, estimated_cost=0.01, estimated_time=2.0, justification="Test" ) successful_results.append( SubTaskResult( sub_task=sub_task, assignment=assignment, result=result, success=True ) ) quality = reasoner._calculate_overall_quality(successful_results) assert 0.0 <= quality.accuracy <= 1.0 assert 0.0 <= quality.consistency <= 1.0 assert 0.0 <= quality.completeness <= 1.0 assert 0.0 <= quality.relevance <= 1.0 assert 0.0 <= quality.confidence <= 1.0 assert 0.0 <= quality.coherence <= 1.0 assert 0.0 <= quality.overall_score() <= 1.0 @pytest.mark.unit def test_calculate_performance_metrics(self, mock_model_provider): """Test calculation of performance metrics.""" reasoner = EnsembleReasoner(mock_model_provider) # Create sub-task results with some successes and failures sub_task_results = [] for i in range(4): sub_task = SubTask( sub_task_id=f"subtask_{i}", parent_task_id="test_parent", content=f"Sub-task {i}", task_type=TaskType.REASONING, required_capabilities=[ModelCapability.REASONING] ) assignment = ModelAssignment( sub_task_id=f"subtask_{i}", model_id=f"model_{i}", confidence_score=0.8, estimated_cost=0.01, estimated_time=2.0, justification="Test" ) # First 3 succeed, last one fails success = i < 3 result = ProcessingResult( task_id=f"subtask_{i}", model_id=f"model_{i}", content=f"Result {i}" if success else "", confidence=0.8 if success else 0.0, processing_time=2.0 if success else 0.0 ) sub_task_results.append( SubTaskResult( sub_task=sub_task, assignment=assignment, result=result, success=success, error_message=None if success else "Test error" ) ) total_time = 10.0 metrics = reasoner._calculate_performance_metrics(sub_task_results, total_time) assert metrics.success_rate == 0.75 # 3/4 successful assert metrics.error_rate == 0.25 # 1/4 failed assert metrics.throughput > 0.0 assert metrics.response_time > 0.0 assert metrics.cost_efficiency > 0.0 assert metrics.resource_utilization == 0.75 @pytest.mark.unit def test_get_processing_history(self, mock_model_provider): """Test access to processing history.""" reasoner = EnsembleReasoner(mock_model_provider) # Add some test results to history test_results = [ EnsembleResult( task_id=f"task_{i}", original_task=TaskContext( task_id=f"task_{i}", task_type=TaskType.REASONING, content=f"Task {i}" ), final_content=f"Result {i}", sub_task_results=[], decomposition_strategy=DecompositionStrategy.SEQUENTIAL, overall_quality=Mock(), performance_metrics=Mock(), total_cost=0.01, total_time=2.0, success_rate=1.0 ) for i in range(5) ] reasoner.processing_history = test_results # Test getting full history full_history = reasoner.get_processing_history() assert len(full_history) == 5 assert full_history == test_results # Test getting limited history limited_history = reasoner.get_processing_history(limit=3) assert len(limited_history) == 3 assert limited_history == test_results[-3:] @pytest.mark.asyncio @pytest.mark.performance async def test_ensemble_performance_with_many_subtasks(self, performance_mock_provider): """Test ensemble performance with many sub-tasks.""" reasoner = EnsembleReasoner(performance_mock_provider) # Create complex task that will generate many sub-tasks complex_task = TaskContext( task_id="complex_performance_test", task_type=TaskType.ANALYSIS, content="A" * 2000, # Long content to trigger complex decomposition requirements={"detail_level": "comprehensive"} ) start_time = datetime.now() result = await reasoner.process(complex_task) end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() # Should complete within reasonable time assert processing_time < 10.0 # 10 seconds max assert isinstance(result, EnsembleResult) assert len(result.sub_task_results) >= 3 assert result.success_rate > 0.0 @pytest.mark.asyncio @pytest.mark.edge_case async def test_ensemble_with_all_subtask_failures(self, failing_model_provider): """Test ensemble handling when all sub-tasks fail.""" reasoner = EnsembleReasoner(failing_model_provider) task = TaskContext( task_id="failing_task", task_type=TaskType.REASONING, content="This task will fail" ) result = await reasoner.process(task) assert isinstance(result, EnsembleResult) assert result.success_rate == 0.0 assert all(not sub_result.success for sub_result in result.sub_task_results) assert "Unable to complete task" in result.final_content @pytest.mark.asyncio @pytest.mark.integration async def test_concurrent_ensemble_processing(self, mock_model_provider): """Test concurrent processing of multiple ensemble tasks.""" reasoner = EnsembleReasoner(mock_model_provider) # Create multiple tasks tasks = [ TaskContext( task_id=f"concurrent_task_{i}", task_type=TaskType.REASONING, content=f"Concurrent task {i} content" ) for i in range(3) ] # Process all tasks concurrently results = await asyncio.gather( *[reasoner.process(task) for task in tasks], return_exceptions=True ) # All should succeed assert len(results) == 3 assert all(isinstance(result, EnsembleResult) for result in results) assert len(set(result.task_id for result in results)) == 3 # All unique # Check processing history assert len(reasoner.processing_history) == 3

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/physics91/openrouter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server