---
phase: 05-retrieval-orchestrator
plan: "01"
type: execute
wave: 1
depends_on: []
files_modified:
- src/skill_retriever/workflows/models.py
- src/skill_retriever/workflows/pipeline.py
- src/skill_retriever/workflows/__init__.py
- tests/test_pipeline.py
autonomous: true
must_haves:
truths:
- "Given a task description, pipeline returns ranked components within latency SLA"
- "Simple queries complete in under 500ms"
- "Complex queries complete in under 1000ms"
- "Repeated queries hit cache and return faster"
artifacts:
- path: "src/skill_retriever/workflows/models.py"
provides: "PipelineResult, ConflictInfo dataclasses"
exports: ["PipelineResult", "ConflictInfo"]
- path: "src/skill_retriever/workflows/pipeline.py"
provides: "RetrievalPipeline coordinator class"
exports: ["RetrievalPipeline"]
- path: "tests/test_pipeline.py"
provides: "Pipeline tests including latency verification"
min_lines: 50
key_links:
- from: "src/skill_retriever/workflows/pipeline.py"
to: "nodes/retrieval/*"
via: "imports and calls existing retrieval functions"
pattern: "from skill_retriever\\.nodes\\.retrieval import"
- from: "src/skill_retriever/workflows/pipeline.py"
to: "memory/graph_store.py"
via: "GraphStore protocol for node/edge access"
pattern: "graph_store\\.(get_node|get_edges)"
---
<objective>
Create the RetrievalPipeline coordinator that orchestrates all Phase 4 retrieval nodes into a single entry point with early exit, caching, and latency monitoring.
Purpose: This is the core orchestration layer that transforms independent retrieval nodes (vector search, PPR, flow pruning, score fusion) into a unified pipeline with performance guarantees.
Output: Working RetrievalPipeline class that chains retrieval stages, caches results with LRU, and tracks latency per call.
</objective>
<execution_context>
@C:\Users\33641\.claude/get-shit-done/workflows/execute-plan.md
@C:\Users\33641\.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/05-retrieval-orchestrator/05-RESEARCH.md
# Phase 4 retrieval nodes (all available)
@src/skill_retriever/nodes/retrieval/__init__.py
@src/skill_retriever/nodes/retrieval/query_planner.py
@src/skill_retriever/nodes/retrieval/vector_search.py
@src/skill_retriever/nodes/retrieval/ppr_engine.py
@src/skill_retriever/nodes/retrieval/flow_pruner.py
@src/skill_retriever/nodes/retrieval/score_fusion.py
@src/skill_retriever/nodes/retrieval/context_assembler.py
@src/skill_retriever/nodes/retrieval/models.py
# Memory layer
@src/skill_retriever/memory/graph_store.py
@src/skill_retriever/memory/vector_store.py
# Entities
@src/skill_retriever/entities/components.py
@src/skill_retriever/entities/graph.py
</context>
<tasks>
<task type="auto">
<name>Task 1: Create pipeline result models</name>
<files>src/skill_retriever/workflows/models.py, src/skill_retriever/workflows/__init__.py</files>
<action>
Create `src/skill_retriever/workflows/models.py` with:
1. `ConflictInfo` dataclass:
- `component_a: str` - First component ID in conflict
- `component_b: str` - Second component ID in conflict
- `reason: str` - Description of conflict (from edge metadata or default)
2. `PipelineResult` dataclass:
- `context: RetrievalContext` - From context_assembler (components + token count)
- `conflicts: list[ConflictInfo]` - Any conflicts found between recommended components
- `dependencies_added: list[str]` - Component IDs added via transitive resolution (empty for now, Plan 02 populates)
- `latency_ms: float` - Total pipeline execution time in milliseconds
- `cache_hit: bool` - Whether result came from cache
Use `from __future__ import annotations` and TYPE_CHECKING block for imports.
Import RetrievalContext from nodes.retrieval.context_assembler.
Update `src/skill_retriever/workflows/__init__.py` to export:
- `ConflictInfo`
- `PipelineResult`
</action>
<verify>
```bash
cd C:/Users/33641/repos/skill-retriever
uv run python -c "from skill_retriever.workflows import PipelineResult, ConflictInfo; print('Models OK')"
uv run ruff check src/skill_retriever/workflows/
uv run pyright src/skill_retriever/workflows/
```
</verify>
<done>PipelineResult and ConflictInfo importable from workflows package, lint and type checks pass</done>
</task>
<task type="auto">
<name>Task 2: Create RetrievalPipeline coordinator with caching and latency monitoring</name>
<files>src/skill_retriever/workflows/pipeline.py, src/skill_retriever/workflows/__init__.py, tests/test_pipeline.py</files>
<action>
Create `src/skill_retriever/workflows/pipeline.py` with `RetrievalPipeline` class:
```python
class RetrievalPipeline:
"""Coordinates retrieval stages with caching and latency monitoring."""
def __init__(
self,
graph_store: GraphStore,
vector_store: FAISSVectorStore,
token_budget: int = 2000,
cache_size: int = 128,
) -> None:
...
def retrieve(
self,
query: str,
component_type: ComponentType | None = None,
top_k: int = 10,
) -> PipelineResult:
"""Execute full retrieval pipeline, returning cached result if available."""
...
def clear_cache(self) -> None:
"""Invalidate all cached results."""
...
@property
def cache_info(self) -> dict[str, int]:
"""Return cache statistics (hits, misses, size)."""
...
```
Implementation details:
1. **Caching Strategy:**
- Use `@functools.lru_cache(maxsize=cache_size)` on internal `_retrieve_impl` method
- Convert `component_type` to `.value` string for hashability (research pitfall #1)
- Cache key: `(query, component_type_str)`
2. **Pipeline Stages (in order):**
- Stage 1: `extract_query_entities()` + `plan_retrieval()` from query_planner
- Stage 2: `search_with_type_filter()` from vector_search (always run)
- Stage 3: If plan.use_ppr: `run_ppr_retrieval()` then optionally `flow_based_pruning()`
- Stage 4: `fuse_retrieval_results()` from score_fusion
- Stage 5: `assemble_context()` with token_budget (from Plan 01, no dependency resolution yet)
- Note: Dependency resolution and conflict detection added in Plan 02
3. **Latency Monitoring:**
- Wrap entire pipeline in `time.perf_counter()` start/end
- Store `latency_ms = (end - start) * 1000` in result
4. **Cache Hit Detection:**
- Compare cache_info before/after call
- If hits increased, set `cache_hit=True`
5. **Early Exit (simple optimization):**
- If plan says no PPR needed AND vector results are high confidence (top score > 0.9), skip graph retrieval
Update `__init__.py` to export `RetrievalPipeline`.
Create `tests/test_pipeline.py` with tests:
- `test_pipeline_returns_result` - Basic retrieval returns PipelineResult
- `test_pipeline_caching` - Second call returns cache_hit=True and faster
- `test_pipeline_latency_tracked` - latency_ms > 0
- `test_pipeline_respects_type_filter` - Pass component_type, verify results match
- `test_pipeline_respects_token_budget` - Result context.total_tokens <= budget
- `test_clear_cache` - After clear_cache(), next call is cache miss
</action>
<verify>
```bash
cd C:/Users/33641/repos/skill-retriever
uv run pytest tests/test_pipeline.py -v
uv run ruff check src/skill_retriever/workflows/
uv run pyright src/skill_retriever/workflows/
```
</verify>
<done>RetrievalPipeline with caching passes all tests, latency tracked, cache hit detection works</done>
</task>
</tasks>
<verification>
```bash
cd C:/Users/33641/repos/skill-retriever
# All tests pass
uv run pytest -v
# Lint clean
uv run ruff check src/
# Types clean
uv run pyright src/
```
</verification>
<success_criteria>
- RetrievalPipeline class exists and orchestrates all Phase 4 retrieval nodes
- Query results are cached with LRU (128 default size)
- Each result includes latency_ms measurement
- cache_hit flag correctly indicates cached vs fresh results
- Token budget is enforced via context_assembler
- All tests pass, lint clean, types clean
</success_criteria>
<output>
After completion, create `.planning/phases/05-retrieval-orchestrator/05-01-SUMMARY.md`
</output>