test_integration.pyā¢2.51 kB
"""Integration tests for tool chaining and workflows (LLM-Orchestrator Pattern)."""
from analysis_mcp.server import _apply_lens, _compare_positions, _deconstruct_claim, _get_trace
class TestWorkflowIntegration:
"""Test realistic analysis workflows."""
def test_full_analysis_pipeline(self, sample_event):
"""Test a complete analysis workflow."""
# Step 1: Deconstruct
event_as_claim = f"Event: {sample_event}"
deconstruct_result = _deconstruct_claim(event_as_claim)
# Verify orchestrator output
assert "trace_id" in deconstruct_result
assert "next_prompt" in deconstruct_result
# Step 2: Compare perspectives
compare_result = _compare_positions(sample_event)
assert "trace_id" in compare_result
assert len(compare_result["outline"]["perspectives"]) > 0
# Step 3: Apply multiple lenses
lenses = ["economic", "geopolitical", "technological"]
lens_results = []
for lens in lenses:
result = _apply_lens(sample_event, lens)
assert "error" not in result
assert "trace_id" in result
lens_results.append(result)
assert len(lens_results) == 3
# Step 4: Verify we can recall all traces
for result in [deconstruct_result, compare_result] + lens_results:
trace = _get_trace(result["trace_id"])
assert trace["found"] is True
def test_multi_lens_analysis(self, sample_claim, all_lenses):
"""Test applying all lenses to same event."""
results = {}
for lens in all_lenses:
result = _apply_lens(sample_claim, lens)
assert "error" not in result
assert "trace_id" in result
results[lens] = result
assert len(results) == len(all_lenses)
# Verify each lens has unique trace_id
trace_ids = [r["trace_id"] for r in results.values()]
assert len(set(trace_ids)) == len(trace_ids) # All unique
def test_comparative_analysis_workflow(self, sample_event):
"""Test comparing different ideological perspectives."""
positions = ["progressive", "conservative", "libertarian"]
result = _compare_positions(sample_event, positions=positions)
assert result["outline"]["topic"] == sample_event
assert result["outline"]["perspectives"] == positions
assert "INTERPRETATION" in result["next_prompt"]
assert "SYNTHESIS" in result["next_prompt"]