"""
End-to-end tests for Research Router with mocked providers.
Tests the full flow through the research router, including:
- Dispatch to workflow classes
- Response envelope formatting
- Thread persistence
- Error handling
"""
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from foundry_mcp.core.providers import ProviderResult, ProviderStatus, TokenUsage
from foundry_mcp.core.research.workflows.base import WorkflowResult
# =============================================================================
# Test Fixtures
# =============================================================================
@pytest.fixture
def mock_config(tmp_path: Path):
"""Mock server config for testing."""
with patch("foundry_mcp.tools.unified.research._get_config") as mock_get_config:
mock_cfg = MagicMock()
mock_cfg.research.enabled = True
mock_cfg.get_research_dir.return_value = tmp_path
mock_cfg.research.ttl_hours = 24
mock_cfg.research.default_provider = "gemini"
mock_cfg.research.consensus_providers = ["gemini", "claude"]
mock_cfg.research.thinkdeep_max_depth = 3
mock_cfg.research.ideate_perspectives = ["technical", "creative"]
mock_cfg.research.max_messages_per_thread = 50 # Prevent MagicMock comparison issues
mock_get_config.return_value = mock_cfg
yield mock_cfg
@pytest.fixture
def mock_memory():
"""Mock research memory instance."""
with patch("foundry_mcp.tools.unified.research._get_memory") as mock_get_memory:
memory = MagicMock()
mock_get_memory.return_value = memory
yield memory
@pytest.fixture
def mock_provider_result():
"""Factory for creating mock ProviderResult objects."""
def _create(
content: str = "Generated research response",
success: bool = True,
provider_id: str = "gemini",
model: str = "gemini-2.0-flash",
):
return ProviderResult(
content=content,
status=ProviderStatus.SUCCESS if success else ProviderStatus.ERROR,
provider_id=provider_id,
model_used=model,
tokens=TokenUsage(input_tokens=100, output_tokens=200, total_tokens=300),
duration_ms=750.0,
)
return _create
@pytest.fixture
def mock_provider_context(mock_provider_result):
"""Create a mock provider context that returns successful results."""
context = MagicMock()
context.generate.return_value = mock_provider_result()
return context
# =============================================================================
# Chat Workflow E2E Tests
# =============================================================================
class TestChatWorkflowE2E:
"""End-to-end tests for chat workflow through router."""
def test_chat_new_thread_full_flow(
self, mock_config, mock_memory
):
"""Chat creates new thread and returns response envelope."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ChatWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Hello! I'm here to help with your research.",
provider_id="gemini",
model_used="gemini-2.0-flash",
tokens_used=150,
duration_ms=500.0,
metadata={
"thread_id": "thread-abc123",
"message_count": 1,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="chat",
prompt="Hello, can you help me?",
)
assert result["success"] is True
assert result["data"]["content"] == "Hello! I'm here to help with your research."
assert result["data"]["thread_id"] == "thread-abc123"
assert result["data"]["message_count"] == 1
assert result["data"]["provider_id"] == "gemini"
assert result["meta"]["version"] == "response-v2"
def test_chat_continue_thread(
self, mock_config, mock_memory
):
"""Chat continues existing thread with context."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ChatWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Here's more information on that topic.",
provider_id="gemini",
model_used="gemini-2.0-flash",
metadata={
"thread_id": "thread-existing",
"message_count": 5,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="chat",
prompt="Tell me more about that.",
thread_id="thread-existing",
)
assert result["success"] is True
assert result["data"]["thread_id"] == "thread-existing"
assert result["data"]["message_count"] == 5
def test_chat_provider_failure(
self, mock_config, mock_memory
):
"""Chat handles provider failure gracefully."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ChatWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=False,
content="",
error="Provider unavailable: Connection timeout",
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="chat",
prompt="Hello",
)
assert result["success"] is False
assert "unavailable" in result["error"].lower()
# =============================================================================
# Consensus Workflow E2E Tests
# =============================================================================
class TestConsensusWorkflowE2E:
"""End-to-end tests for consensus workflow through router."""
def test_consensus_synthesize_strategy(
self, mock_config, mock_memory
):
"""Consensus workflow synthesizes multiple provider responses."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ConsensusWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Synthesized consensus: Both providers agree that...",
provider_id="synthesis",
model_used="gemini-2.0-flash",
tokens_used=500,
duration_ms=1500.0,
metadata={
"consensus_id": "cons-123",
"providers_consulted": ["gemini", "claude"],
"strategy": "synthesize",
"response_count": 2,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="consensus",
prompt="What is the best approach for X?",
strategy="synthesize",
)
assert result["success"] is True
assert "consensus" in result["data"]["content"].lower()
assert result["data"]["consensus_id"] == "cons-123"
assert len(result["data"]["providers_consulted"]) == 2
assert result["meta"]["version"] == "response-v2"
def test_consensus_all_responses_strategy(
self, mock_config, mock_memory
):
"""Consensus workflow returns all individual responses."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ConsensusWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Provider 1: ... Provider 2: ...",
metadata={
"consensus_id": "cons-456",
"providers_consulted": ["gemini", "claude", "openai"],
"strategy": "all_responses",
"response_count": 3,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="consensus",
prompt="Compare approaches",
strategy="all_responses",
)
assert result["success"] is True
assert len(result["data"]["providers_consulted"]) == 3
assert result["data"]["response_count"] == 3
# =============================================================================
# ThinkDeep Workflow E2E Tests
# =============================================================================
class TestThinkDeepWorkflowE2E:
"""End-to-end tests for thinkdeep workflow through router."""
def test_thinkdeep_new_investigation(
self, mock_config, mock_memory
):
"""ThinkDeep starts new investigation with topic."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ThinkDeepWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Initial investigation findings...",
provider_id="gemini",
model_used="gemini-2.0-flash",
tokens_used=300,
duration_ms=2000.0,
metadata={
"investigation_id": "inv-789",
"current_depth": 1,
"max_depth": 3,
"converged": False,
"hypothesis_count": 3,
"step_count": 1,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="thinkdeep",
topic="Why do databases use B-trees?",
)
assert result["success"] is True
assert result["data"]["investigation_id"] == "inv-789"
assert result["data"]["current_depth"] == 1
assert result["data"]["converged"] is False
assert result["meta"]["version"] == "response-v2"
def test_thinkdeep_continue_investigation(
self, mock_config, mock_memory
):
"""ThinkDeep continues existing investigation."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ThinkDeepWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Deeper analysis reveals...",
metadata={
"investigation_id": "inv-existing",
"current_depth": 2,
"max_depth": 3,
"converged": False,
"hypothesis_count": 5,
"step_count": 3,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="thinkdeep",
investigation_id="inv-existing",
query="What about performance implications?",
)
assert result["success"] is True
assert result["data"]["investigation_id"] == "inv-existing"
assert result["data"]["current_depth"] == 2
def test_thinkdeep_converged(
self, mock_config, mock_memory
):
"""ThinkDeep indicates when investigation has converged."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ThinkDeepWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Final conclusion: ...",
metadata={
"investigation_id": "inv-done",
"current_depth": 3,
"max_depth": 3,
"converged": True,
"hypothesis_count": 8,
"step_count": 5,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="thinkdeep",
investigation_id="inv-done",
)
assert result["success"] is True
assert result["data"]["converged"] is True
# =============================================================================
# Ideate Workflow E2E Tests
# =============================================================================
class TestIdeateWorkflowE2E:
"""End-to-end tests for ideate workflow through router."""
def test_ideate_generate_ideas(
self, mock_config, mock_memory
):
"""Ideate generates ideas for a topic."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.IdeateWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="1. First idea\n2. Second idea\n3. Third idea",
provider_id="gemini",
model_used="gemini-2.0-flash",
tokens_used=200,
duration_ms=800.0,
metadata={
"ideation_id": "ide-abc",
"phase": "divergent",
"idea_count": 10,
"cluster_count": 0,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="ideate",
topic="New features for the application",
ideation_action="generate",
)
assert result["success"] is True
assert result["data"]["ideation_id"] == "ide-abc"
assert result["data"]["phase"] == "divergent"
assert result["data"]["idea_count"] == 10
assert result["meta"]["version"] == "response-v2"
def test_ideate_cluster_ideas(
self, mock_config, mock_memory
):
"""Ideate clusters existing ideas."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.IdeateWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Cluster 1: Technical\nCluster 2: UX",
metadata={
"ideation_id": "ide-existing",
"phase": "convergent",
"idea_count": 10,
"cluster_count": 3,
},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="ideate",
ideation_id="ide-existing",
ideation_action="cluster",
)
assert result["success"] is True
assert result["data"]["phase"] == "convergent"
assert result["data"]["cluster_count"] == 3
# =============================================================================
# Thread Operations E2E Tests
# =============================================================================
class TestThreadOperationsE2E:
"""End-to-end tests for thread management operations."""
def test_thread_list(self, mock_config, mock_memory):
"""Thread-list returns all threads."""
from foundry_mcp.core.research.models import ThreadStatus
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ChatWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.list_threads.return_value = [
{
"id": "t-1",
"title": "Thread 1",
"status": ThreadStatus.ACTIVE.value,
"message_count": 5,
},
{
"id": "t-2",
"title": "Thread 2",
"status": ThreadStatus.ACTIVE.value,
"message_count": 3,
},
]
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(action="thread-list")
assert result["success"] is True
assert len(result["data"]["threads"]) == 2
assert result["data"]["count"] == 2
def test_thread_get(self, mock_config, mock_memory):
"""Thread-get returns specific thread details."""
from foundry_mcp.core.research.models import ThreadStatus
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ChatWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.get_thread.return_value = {
"id": "t-target",
"title": "Target Thread",
"status": ThreadStatus.ACTIVE.value,
"message_count": 10,
"messages": [],
}
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="thread-get",
thread_id="t-target",
)
assert result["success"] is True
# Response structure depends on implementation
assert result["data"]["id"] == "t-target" or (
"thread" in result["data"] and result["data"]["thread"]["id"] == "t-target"
)
def test_thread_delete(self, mock_config, mock_memory):
"""Thread-delete removes thread."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ChatWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.delete_thread.return_value = True
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(
action="thread-delete",
thread_id="t-to-delete",
)
assert result["success"] is True
# =============================================================================
# Response Envelope E2E Tests
# =============================================================================
class TestResponseEnvelopeE2E:
"""End-to-end tests verifying response envelope structure."""
def test_success_envelope_structure(
self, mock_config, mock_memory
):
"""Successful response has correct envelope structure."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ChatWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.return_value = WorkflowResult(
success=True,
content="Response content",
provider_id="gemini",
model_used="gemini-2.0-flash",
tokens_used=100,
metadata={"thread_id": "t-1", "message_count": 1},
)
MockWorkflow.return_value = mock_workflow
result = _dispatch_research_action(action="chat", prompt="Test")
# Verify envelope structure
assert "success" in result
assert "data" in result
assert "meta" in result
assert result["success"] is True
assert result["meta"]["version"] == "response-v2"
def test_error_envelope_structure(
self, mock_config, mock_memory
):
"""Error response has correct envelope structure."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
result = _dispatch_research_action(action="chat") # Missing prompt
assert "success" in result
assert "error" in result
assert "data" in result
assert result["success"] is False
assert "error_code" in result["data"]
assert "error_type" in result["data"]
# =============================================================================
# Error Handling E2E Tests
# =============================================================================
class TestErrorHandlingE2E:
"""End-to-end tests for error handling."""
def test_invalid_action_error(
self, mock_config, mock_memory
):
"""Invalid action returns appropriate error."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
result = _dispatch_research_action(action="nonexistent_action")
assert result["success"] is False
# Error message contains "unsupported" for unknown actions
assert "unsupported" in result["error"].lower() or "invalid" in result["error"].lower()
assert result["data"]["error_code"] == "VALIDATION_ERROR"
def test_missing_required_param_error(
self, mock_config, mock_memory
):
"""Missing required parameter returns validation error."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
# Chat requires prompt
result = _dispatch_research_action(action="chat")
assert result["success"] is False
assert "prompt" in result["error"].lower()
assert result["data"]["error_type"] == "validation"
def test_workflow_exception_error(
self, mock_config, mock_memory
):
"""Workflow exception is propagated when not handled."""
from foundry_mcp.tools.unified.research import _dispatch_research_action
with patch(
"foundry_mcp.tools.unified.research.ChatWorkflow"
) as MockWorkflow:
mock_workflow = MagicMock()
mock_workflow.execute.side_effect = RuntimeError("Unexpected error")
MockWorkflow.return_value = mock_workflow
# The implementation may propagate or catch exceptions
# Both behaviors are acceptable - verify it doesn't silently fail
try:
result = _dispatch_research_action(action="chat", prompt="Test")
# If caught, should be error result
assert result["success"] is False
assert "error" in result
except RuntimeError:
# If propagated, that's also acceptable behavior
pass