/**
* @fileoverview Auto-generated OODA loop catalog
* DO NOT EDIT MANUALLY - Generated by scripts/embed-loops.ts
* Source: .claude/commands/loops/
* @module src/resources/loops-content
*/
export interface LoopMetadata {
type: string;
speed: string;
scope: string;
description: string;
interface_version: string;
inputs: unknown[];
outputs: unknown[];
signals: unknown[];
composition: Record<string, unknown>;
}
export interface Loop {
content: string;
metadata: LoopMetadata;
}
export interface LoopsCatalog {
[category: string]: {
[loopName: string]: Loop;
};
}
export const LOOPS_CATALOG: LoopsCatalog = {
'authoring': {
'code-generation': {
content: `# Code Generation Loop
Generate implementation code from specifications with iterative refinement.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: authoring
- **Speed**: medium (~1-5 minutes per component)
- **Scope**: document (spec) → files (implementation)
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| spec | SpecDocument | yes | Specification to implement |
| codebase_context | CodebaseContext | no | Existing code patterns and conventions |
| target_files | List<FilePath> | no | Files to create/modify |
| implementation_style | Style | no | Coding style preferences |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| files_created | List<FileChange> | New files with content |
| files_modified | List<FileChange> | Modified files with diffs |
| tests_generated | List<FileChange> | Test files created |
| implementation_notes | List<Note> | Decisions and rationale |
| coverage_map | Map<Requirement, Implementation> | Traceability matrix |
### State
| Field | Type | Description |
|-------|------|-------------|
| iteration | Number | Current implementation iteration |
| requirements_implemented | Set<RequirementId> | Requirements addressed |
| files_touched | Set<FilePath> | All files modified across iterations |
| scope_baseline | Set<FilePath> | Files expected to be modified |
## OODA Phases
### OBSERVE
Analyze spec and gather implementation context:
\`\`\`
1. PARSE specification:
requirements = extract_requirements(spec)
# FR-001, FR-002, NFR-001, etc.
interfaces = extract_interfaces(spec)
# API contracts, data models, signatures
acceptance_criteria = extract_criteria(spec)
# Testable conditions for each requirement
dependencies = extract_dependencies(spec)
# Other specs, external services, libraries
2. ANALYZE codebase context:
IF codebase_context provided:
patterns = codebase_context.patterns
conventions = codebase_context.conventions
architecture = codebase_context.architecture
ELSE:
# Quick discovery
Execute @loops/exploration/codebase-discovery.md
focus_area: spec.domain
depth: "shallow"
3. DETERMINE scope:
IF target_files provided:
scope_baseline = Set(target_files)
ELSE:
# Infer from spec and architecture
scope_baseline = infer_affected_files(
spec: spec,
architecture: architecture,
interfaces: interfaces
)
4. PLAN implementation order:
# Topological sort by dependency
implementation_order = topological_sort(requirements, by_dependency)
# Group by file to minimize context switches
file_groups = group_requirements_by_file(implementation_order, scope_baseline)
SIGNALS:
requirements: parsed requirements with priorities
scope_baseline: expected files to modify
implementation_order: ordered list of work
\`\`\`
### ORIENT
Design implementation approach for each component:
\`\`\`
1. FOR each file_group in file_groups:
file = file_group.file
reqs = file_group.requirements
# Assess current state
IF file exists:
current_content = Read(file)
existing_code = parse_code(current_content)
# Identify integration points
integration_points = find_where_to_insert(
existing_code,
reqs
)
ELSE:
existing_code = None
integration_points = "new_file"
# Select implementation pattern
pattern = select_pattern(
requirements: reqs,
conventions: conventions,
existing_patterns: patterns
)
# Design code structure
design = {
file: file,
pattern: pattern,
integration_points: integration_points,
new_code_sections: plan_sections(reqs, pattern),
tests_needed: derive_tests(reqs, acceptance_criteria)
}
implementation_designs.append(design)
2. VALIDATE design consistency:
# Check for conflicts between designs
FOR i, design_a in implementation_designs:
FOR j, design_b in implementation_designs[i+1:]:
conflicts = detect_conflicts(design_a, design_b)
IF conflicts:
resolve_conflicts(conflicts)
# Check adherence to conventions
FOR design in implementation_designs:
violations = check_conventions(design, conventions)
IF violations:
adjust_design(design, violations)
3. ESTIMATE complexity:
FOR design in implementation_designs:
design.complexity = estimate_complexity(
new_lines: design.estimated_lines,
integration_difficulty: design.integration_points.difficulty,
test_complexity: design.tests_needed.count
)
\`\`\`
### DECIDE
Commit to implementation strategy:
\`\`\`
1. PRIORITIZE by requirement priority:
must_have = filter(requirements, priority == "must")
should_have = filter(requirements, priority == "should")
could_have = filter(requirements, priority == "could")
# Ensure must-haves are implementable
FOR req in must_have:
IF not has_design(req):
ESCALATE "Cannot implement must-have requirement: {req.id}"
2. DECIDE on iteration scope:
IF iteration == 1:
# First iteration: core structure + must-haves
this_iteration = must_have[:max_per_iteration]
focus = "structure_and_core"
ELIF iteration == 2:
# Second iteration: remaining must + should
this_iteration = remaining_must + should_have[:remaining_capacity]
focus = "complete_and_enhance"
ELSE:
# Later iterations: polish and edge cases
this_iteration = could_have + remaining
focus = "polish"
3. PLAN test strategy:
test_strategy = {
unit_tests: for_each_function,
integration_tests: for_each_interface,
acceptance_tests: for_each_criterion
}
\`\`\`
### ACT
Generate code and tests:
\`\`\`
1. FOR each design in this_iteration_designs:
file = design.file
IF design.integration_points == "new_file":
# Generate new file
content = generate_file(
template: get_file_template(file.type),
sections: design.new_code_sections,
pattern: design.pattern,
conventions: conventions
)
files_created.append({
path: file,
content: content,
requirements_covered: design.requirements
})
ELSE:
# Modify existing file
current_content = Read(file)
FOR section in design.new_code_sections:
insertion_point = design.integration_points[section.id]
new_code = generate_code_section(
section: section,
context: surrounding_code(current_content, insertion_point),
pattern: design.pattern,
conventions: conventions
)
current_content = insert_at(
content: current_content,
position: insertion_point,
code: new_code
)
files_modified.append({
path: file,
original: Read(file),
modified: current_content,
requirements_covered: design.requirements
})
files_touched.add(file)
2. GENERATE tests:
FOR design in this_iteration_designs:
FOR test_spec in design.tests_needed:
test_content = generate_test(
spec: test_spec,
implementation: get_implementation(design),
pattern: test_pattern_for(file.type)
)
test_file = derive_test_path(design.file)
tests_generated.append({
path: test_file,
content: test_content,
tests_requirement: test_spec.requirement_id
})
3. WRITE files:
FOR file_change in files_created + files_modified:
Write(file_change.path, file_change.content)
FOR test_file in tests_generated:
Write(test_file.path, test_file.content)
4. UPDATE coverage map:
FOR req in requirements:
implementation = find_implementation(req, files_touched)
IF implementation:
coverage_map[req.id] = {
files: implementation.files,
lines: implementation.line_ranges,
tests: find_tests(req.id, tests_generated)
}
requirements_implemented.add(req.id)
5. CHECK for spiral:
Execute @loops/orchestration/spiral-detector.md with:
iteration_history: previous_iterations
current_iteration: {
files_modified: files_touched,
checklist_score: len(requirements_implemented) / len(requirements),
scope_baseline: scope_baseline
}
IF spiral_detected:
EMIT spiral_warning
IF severity == "critical":
BREAK loop
6. EMIT progress:
EMIT code_generated {
iteration: iteration,
files_created: len(files_created),
files_modified: len(files_modified),
requirements_covered: len(requirements_implemented),
coverage_percent: len(requirements_implemented) / len(requirements)
}
7. CHECK completion:
IF all(req.id in requirements_implemented for req in must_have):
IF iteration >= max_iterations OR all requirements implemented:
RETURN outputs
ELSE:
iteration += 1
RETURN to OBSERVE for next iteration
\`\`\`
## Termination Conditions
- **Success**: All must-have requirements implemented with passing tests
- **Failure**: Cannot implement must-have requirement or spiral detected
- **Timeout**: \`iteration >= MAX_ITERATIONS\` (default 3)
## Composition
### Can contain (nested loops)
- \`orchestration/spiral-detector\` (inline monitoring)
- \`refinement/code-quality\` (post-generation polish)
### Can be contained by
- \`orchestration/queue-processor\` (batch implementation)
- Workflow commands (\`/spec-orchestrator\`)
### Parallelizable
- Conditional: Independent files can be generated in parallel
- No: Files with dependencies must be sequential
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`file_created\` | New file written | \`{ path, lines, requirements }\` |
| \`file_modified\` | Existing file changed | \`{ path, additions, deletions }\` |
| \`test_generated\` | Test file created | \`{ path, test_count, coverage }\` |
| \`code_generated\` | Iteration complete | \`{ iteration, coverage_percent }\` |
| \`spiral_warning\` | Spiral detected | \`{ pattern, severity }\` |
| \`implementation_complete\` | All done | \`{ files, tests, coverage_map }\` |
## Code Generation Templates
### Function Template
\`\`\`typescript
/**
* {description from requirement}
*
* @implements {requirement_id}
* @param {params from spec}
* @returns {return type from spec}
* @throws {error conditions from spec}
*/
{visibility} {async?} function {name}({params}): {return_type} {
// Implementation
}
\`\`\`
### Class Template
\`\`\`typescript
/**
* {description from spec}
*
* @implements {requirement_ids}
*/
{export?} class {ClassName} {implements/extends} {
// Dependencies
constructor({dependencies}) {}
// Public methods from interface
// Private helpers
}
\`\`\`
## Example Usage
\`\`\`markdown
## Implementation Phase
FOR each spec in implementation_queue:
Execute @loops/authoring/code-generation.md with:
INPUT:
spec: spec
codebase_context: discovered_context
CONFIG:
MAX_ITERATIONS: 3
ON code_generated:
LOG "Iteration {iteration}: {coverage_percent}% coverage"
ON spiral_warning:
IF severity == "critical":
ESCALATE to user
ELSE:
CONTINUE with constraints
ON implementation_complete:
RUN tests
IF tests pass:
PROCEED to next spec
ELSE:
ITERATE with test feedback
\`\`\`
`,
metadata: {
"type": "authoring",
"speed": "medium",
"scope": "document",
"description": "Generate implementation code from specifications with iterative refinement.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'documentation': {
content: `# Documentation Loop
Generate and maintain documentation from code and specifications.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: authoring
- **Speed**: medium (~30s-2min per document)
- **Scope**: document
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| source | Source | yes | What to document (code, spec, API) |
| doc_type | DocType | yes | Type of documentation to generate |
| audience | Audience | no | Target audience (default: developers) |
| existing_docs | List<FilePath> | no | Existing docs to update/integrate |
| style_guide | StyleGuide | no | Documentation style preferences |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| documents | List<Document> | Generated documentation |
| updates | List<DocUpdate> | Updates to existing docs |
| cross_references | List<CrossRef> | Links between docs |
| coverage | DocCoverage | What's documented vs not |
### State
| Field | Type | Description |
|-------|------|-------------|
| sections_written | Set<SectionId> | Sections completed |
| examples_generated | List<Example> | Code examples created |
## Types
\`\`\`typescript
type Source =
| { type: "code", files: List<FilePath> }
| { type: "spec", document: SpecDocument }
| { type: "api", endpoints: List<Endpoint> }
| { type: "schema", definitions: List<Schema> }
type DocType =
| "readme" // Project/module README
| "api_reference" // API documentation
| "guide" // How-to guide
| "tutorial" // Step-by-step tutorial
| "architecture" // Architecture documentation
| "changelog" // Version changes
| "contributing" // Contribution guidelines
type Audience =
| "developers" // Internal developers
| "api_consumers" // External API users
| "operators" // DevOps/SRE
| "end_users" // Non-technical users
\`\`\`
## OODA Phases
### OBSERVE
Extract documentable content from source:
\`\`\`
1. ANALYZE source type:
IF source.type == "code":
# Extract from code
FOR file in source.files:
content = Read(file)
# Extract documentation elements
exports = extract_exports(content)
functions = extract_functions(content)
classes = extract_classes(content)
types = extract_types(content)
comments = extract_doc_comments(content)
code_elements.append({
file: file,
exports, functions, classes, types, comments
})
ELIF source.type == "spec":
# Extract from specification
spec_elements = {
summary: source.document.summary,
requirements: source.document.requirements,
api: source.document.api_design,
data_model: source.document.data_model,
examples: source.document.examples
}
ELIF source.type == "api":
# Extract from API endpoints
FOR endpoint in source.endpoints:
api_elements.append({
method: endpoint.method,
path: endpoint.path,
params: endpoint.parameters,
request_body: endpoint.request,
response: endpoint.response,
errors: endpoint.errors
})
ELIF source.type == "schema":
# Extract from schema definitions
FOR schema in source.definitions:
schema_elements.append({
name: schema.name,
fields: schema.fields,
relationships: schema.relationships,
constraints: schema.constraints
})
2. IDENTIFY documentation gaps:
IF existing_docs:
FOR doc in existing_docs:
existing_content = Read(doc)
documented_items = extract_documented_items(existing_content)
# Find undocumented items
all_items = collect_all_documentable_items(source)
undocumented = all_items - documented_items
ELSE:
undocumented = all_items
3. GATHER examples:
# Look for existing examples in tests
test_files = find_test_files(source)
FOR test_file in test_files:
examples_from_tests = extract_usage_examples(test_file)
examples.extend(examples_from_tests)
SIGNALS:
documentable_items: items to document
undocumented: gaps to fill
existing_examples: examples found in codebase
\`\`\`
### ORIENT
Plan documentation structure:
\`\`\`
1. SELECT structure based on doc_type:
IF doc_type == "readme":
structure = [
"Title & Badges",
"Description",
"Installation",
"Quick Start",
"Usage",
"API Overview",
"Configuration",
"Contributing",
"License"
]
ELIF doc_type == "api_reference":
structure = [
"Overview",
"Authentication",
"Base URL",
"Endpoints" (grouped by resource),
"Error Handling",
"Rate Limiting",
"Examples"
]
ELIF doc_type == "guide":
structure = [
"Introduction",
"Prerequisites",
"Step-by-Step Instructions",
"Common Patterns",
"Troubleshooting",
"Next Steps"
]
ELIF doc_type == "tutorial":
structure = [
"What You'll Build",
"Prerequisites",
"Step 1..N",
"Testing Your Work",
"Summary",
"Further Reading"
]
ELIF doc_type == "architecture":
structure = [
"System Overview",
"Component Diagram",
"Data Flow",
"Key Design Decisions",
"Integration Points",
"Deployment"
]
2. MAP content to structure:
content_mapping = {}
FOR section in structure:
relevant_items = filter_items_for_section(
documentable_items,
section,
audience
)
content_mapping[section] = relevant_items
3. PLAN examples:
FOR section in content_mapping:
IF section needs examples:
IF existing_examples for section:
planned_examples[section] = existing_examples
ELSE:
planned_examples[section] = generate_example_spec(section)
4. ADJUST for audience:
IF audience == "end_users":
# Remove technical details
remove_sections(["API Overview", "Configuration"])
simplify_language()
ELIF audience == "operators":
# Emphasize operational aspects
expand_sections(["Configuration", "Deployment", "Monitoring"])
ELIF audience == "api_consumers":
# Focus on API usage
expand_sections(["Authentication", "Endpoints", "Examples"])
\`\`\`
### DECIDE
Commit to documentation content:
\`\`\`
1. PRIORITIZE sections:
required_sections = filter(structure, is_required_for(doc_type))
optional_sections = filter(structure, is_optional)
# Must document required sections
FOR section in required_sections:
IF not content_mapping[section]:
FLAG missing_content(section)
2. DECIDE on example generation:
FOR section, example_spec in planned_examples:
IF example_spec.complexity > threshold:
decision = "generate_simple_example"
ELIF existing_examples available:
decision = "adapt_existing_example"
ELSE:
decision = "generate_new_example"
example_decisions[section] = decision
3. DECIDE on cross-references:
FOR item in documentable_items:
related_items = find_related(item, documentable_items)
IF related_items:
cross_references.append({
from: item,
to: related_items,
relationship: "see_also" | "implements" | "uses"
})
\`\`\`
### ACT
Generate documentation:
\`\`\`
1. GENERATE each section:
document_content = []
FOR section in structure:
items = content_mapping[section]
section_content = generate_section(
title: section,
items: items,
style: style_guide,
audience: audience
)
# Add examples if planned
IF section in planned_examples:
example = generate_or_adapt_example(
spec: planned_examples[section],
decision: example_decisions[section],
existing: existing_examples
)
section_content += format_example(example)
examples_generated.append(example)
document_content.append(section_content)
sections_written.add(section)
2. ADD cross-references:
FOR xref in cross_references:
insert_link(
document: document_content,
at: xref.from,
to: xref.to,
text: xref.relationship
)
3. FORMAT document:
IF style_guide:
document = apply_style(document_content, style_guide)
ELSE:
document = format_markdown(document_content)
4. GENERATE updates for existing docs:
IF existing_docs:
FOR existing_doc in existing_docs:
diff = calculate_updates(
existing: Read(existing_doc),
new_items: undocumented relevant to existing_doc
)
IF diff:
updates.append({
path: existing_doc,
changes: diff
})
5. CALCULATE coverage:
coverage = {
total_items: len(documentable_items),
documented: len(sections_written),
with_examples: len(examples_generated),
percentage: len(documented) / len(total_items)
}
6. OUTPUT documents:
FOR doc in documents:
Write(doc.path, doc.content)
FOR update in updates:
apply_update(update)
7. EMIT completion:
EMIT documentation_complete {
doc_type: doc_type,
sections: len(sections_written),
examples: len(examples_generated),
coverage: coverage.percentage
}
\`\`\`
## Termination Conditions
- **Success**: All required sections documented with acceptable coverage
- **Failure**: Cannot extract documentable content from source
- **Timeout**: Max generation time reached
## Composition
### Can contain (nested loops)
- None (leaf authoring loop)
### Can be contained by
- \`authoring/spec-drafting\` (generate docs from spec)
- \`authoring/code-generation\` (generate docs alongside code)
- Workflow commands
### Parallelizable
- Yes: Different doc types can be generated in parallel
- Yes: Different sections can be generated in parallel
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`section_written\` | Section complete | \`{ section, word_count }\` |
| \`example_generated\` | Example created | \`{ section, example_type }\` |
| \`gap_identified\` | Missing content | \`{ section, missing_items }\` |
| \`documentation_complete\` | All done | \`{ doc_type, coverage }\` |
## Documentation Templates
### API Endpoint Template
\`\`\`markdown
### {METHOD} {path}
{description}
**Authentication**: {auth_requirement}
**Parameters**
| Name | Type | Required | Description |
|------|------|----------|-------------|
| {name} | {type} | {yes/no} | {description} |
**Request Body**
\`\`\`json
{example_request}
\`\`\`
**Response**
\`\`\`json
{example_response}
\`\`\`
**Errors**
| Code | Description |
|------|-------------|
| {code} | {description} |
\`\`\`
### Function Documentation Template
\`\`\`markdown
### \`{function_name}({params})\`
{description}
**Parameters**
- \`{param}\` ({type}): {description}
**Returns**
{return_type}: {description}
**Example**
\`\`\`{language}
{example_code}
\`\`\`
\`\`\`
## Example Usage
\`\`\`markdown
## Generate API Documentation
Execute @loops/authoring/documentation.md with:
INPUT:
source: { type: "api", endpoints: api_endpoints }
doc_type: "api_reference"
audience: "api_consumers"
style_guide: openapi_style
ON section_written:
LOG "Documented: {section}"
ON documentation_complete:
IF coverage.percentage < 0.9:
FLAG incomplete documentation
ELSE:
PUBLISH to docs site
\`\`\`
`,
metadata: {
"type": "authoring",
"speed": "medium",
"scope": "document",
"description": "Generate and maintain documentation from code and specifications.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'spec-drafting': {
content: `# Spec Drafting Loop
Draft specification documents with appropriate structure and depth.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: authoring
- **Speed**: medium (~30s-2min per spec)
- **Scope**: document
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| spec_summary | SpecSummary | yes | Spec metadata from exploration phase |
| requirements | List<Requirement> | yes | Requirements assigned to this spec |
| dependencies | List<SpecRef> | no | Dependent specs (for cross-referencing) |
| depth | "shallow" \\| "standard" \\| "comprehensive" | no | Detail level (default: standard) |
| template_overrides | Map<Section, Template> | no | Custom section templates |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| spec_document | SpecDocument | Complete spec markdown |
| section_scores | Map<Section, Score> | Quality score per section |
| tbd_items | List<TBDItem> | Items marked for later resolution |
| cross_references | List<CrossRef> | References to other specs/docs |
| diagrams | List<Diagram> | Generated mermaid diagrams |
### State
| Field | Type | Description |
|-------|------|-------------|
| current_section | Section | Section being drafted |
| draft_content | Map<Section, Text> | Content drafted so far |
| iteration | Number | Refinement iteration count |
## OODA Phases
### OBSERVE
Gather spec-specific context:
\`\`\`
1. REVIEW spec summary:
→ What is the core purpose?
→ What boundaries were set?
→ What complexity was estimated?
2. LOAD relevant existing code:
IF spec is enhancement/refactor:
→ Read current implementation files
→ Extract interfaces and contracts
→ Note existing patterns and conventions
IF spec has dependencies:
→ Read dependent spec drafts
→ Extract interface requirements
→ Note integration points
3. GATHER terminology:
→ Domain-specific terms used
→ Existing naming conventions in codebase
→ Industry standard terminology
4. IDENTIFY diagram candidates:
→ Architecture overview needed?
→ Data flow visualization?
→ State machine?
→ Sequence diagrams?
OBSERVATION OUTPUTS:
existing_code: relevant code snippets
interface_contracts: what other specs expect
terminology: domain glossary
diagram_needs: list of diagrams to generate
\`\`\`
### ORIENT
Plan document structure and content:
\`\`\`
1. SELECT spec type:
FEATURE (new capability):
sections = [
"Executive Summary",
"Background",
"User Stories",
"Functional Requirements",
"Non-Functional Requirements",
"API Design",
"Data Model",
"Migration Path",
"Acceptance Criteria"
]
ARCHITECTURE (system design):
sections = [
"Problem Statement",
"Constraints & Assumptions",
"Architecture Overview",
"Component Design",
"Integration Points",
"Trade-off Analysis",
"Alternatives Considered",
"Risk Assessment"
]
REFACTOR (improving existing):
sections = [
"Current State Analysis",
"Problems to Address",
"Target State",
"Migration Strategy",
"Backwards Compatibility",
"Rollback Plan",
"Acceptance Criteria"
]
2. DETERMINE depth per section:
IF depth == "shallow":
words_per_section = 50-100
examples = minimal
edge_cases = none
IF depth == "standard":
words_per_section = 100-300
examples = key scenarios
edge_cases = major ones
IF depth == "comprehensive":
words_per_section = 200-500
examples = exhaustive
edge_cases = all identified
3. PLAN content strategy:
For each section:
priority = Required | Recommended | Optional | Defer
Required: Must include for spec validity
Recommended: Should include for completeness
Optional: Include if context exists
Defer: Mark TBD, flag for follow-up
4. MAP requirements to sections:
For each requirement:
→ Which section does it belong in?
→ Does it need acceptance criteria?
→ Does it need examples?
\`\`\`
### DECIDE
Commit to content for each section:
\`\`\`
For each section in order:
ASSESS readiness:
have_context = relevant info gathered?
have_requirements = requirements mapped?
have_examples = examples identified?
IF have_context AND have_requirements:
action = "draft"
ELIF section.priority == "Required":
action = "draft_with_placeholders"
ELIF section.priority == "Defer":
action = "mark_tbd"
ELSE:
action = "skip"
CONTENT DECISIONS:
include_code_examples: boolean
include_diagrams: list of diagram types
include_tables: list of table types
cross_ref_needed: list of other specs to reference
\`\`\`
### ACT
Generate the spec document:
\`\`\`
1. WRITE header:
"""
# {spec_summary.name}
**Version:** 1.0.0
**Date:** {today}
**Status:** Draft
**Author:** AI-assisted via @loops/authoring/spec-drafting
**Depends on:** {dependencies}
---
## Table of Contents
{generated_toc}
---
"""
2. FOR each section:
IF action == "draft":
content = generate_section_content(
section=section,
requirements=mapped_requirements,
depth=depth,
existing_code=relevant_snippets
)
IF include_code_examples:
content += generate_code_examples()
IF include_diagrams:
diagram = generate_mermaid_diagram(diagram_type)
content += diagram
diagrams.append(diagram)
IF include_tables:
content += generate_tables()
section_scores[section] = assess_section_quality(content)
ELIF action == "draft_with_placeholders":
content = generate_section_skeleton(section)
content += "[TBD: {missing_info}]"
tbd_items.append({section, missing_info})
ELIF action == "mark_tbd":
content = "**[TBD]** This section will be completed when {condition}."
tbd_items.append({section, condition})
draft_content[section] = content
3. GENERATE cross-references:
For each dependency:
Add reference in relevant sections
cross_references.append({
from_section: section,
to_spec: dependency,
relationship: "depends_on" | "extends" | "implements"
})
4. ASSEMBLE document:
spec_document = header
For each section in sections:
spec_document += f"\\n\\n## {section.number}. {section.title}\\n\\n"
spec_document += draft_content[section]
spec_document += generate_appendix(terminology, references)
5. EMIT completion signal:
overall_score = avg(section_scores.values())
EMIT spec_drafted {
spec: spec_summary.name,
score: overall_score,
tbd_count: len(tbd_items),
diagram_count: len(diagrams)
}
\`\`\`
## Termination Conditions
- **Success**: All Required sections drafted with \`score >= 0.7\`
- **Failure**: Cannot gather minimum context for Required sections
- **Timeout**: Time limit reached → emit partial spec with TBDs
## Composition
### Can contain (nested loops)
- \`refinement/requirement-quality\` (inline requirement polishing)
- \`authoring/documentation\` (for verbose sections)
### Can be contained by
- \`exploration/problem-space\` (post-exploration)
- \`orchestration/queue-processor\` (batch spec drafting)
### Parallelizable
- Yes: Independent specs can be drafted in parallel
- No: Dependent specs must be drafted in order
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`section_drafted\` | Each section complete | \`{ section, score, word_count }\` |
| \`diagram_generated\` | Diagram created | \`{ type, mermaid_source }\` |
| \`tbd_flagged\` | Item deferred | \`{ section, reason }\` |
| \`spec_drafted\` | Spec complete | \`{ spec_name, overall_score, tbd_count }\` |
| \`cross_ref_needed\` | Missing dependency | \`{ from_spec, to_spec, reason }\` |
## Section Templates
### Requirements Table Template
\`\`\`markdown
| ID | Requirement | Priority | Acceptance Criteria |
|----|-------------|----------|---------------------|
| FR-001 | {specific requirement} | Must | {testable criteria} |
| FR-002 | {requirement} | Should | {criteria} |
\`\`\`
### Trade-off Table Template
\`\`\`markdown
| Option | Pros | Cons | Decision |
|--------|------|------|----------|
| A: {name} | {pros} | {cons} | {chosen/rejected + why} |
| B: {name} | {pros} | {cons} | {chosen/rejected + why} |
\`\`\`
### Risk Table Template
\`\`\`markdown
| Risk | Severity | Likelihood | Mitigation |
|------|----------|------------|------------|
| {risk description} | High/Med/Low | High/Med/Low | {mitigation strategy} |
\`\`\`
## Example Usage
\`\`\`markdown
## Phase 2: Authoring
For each spec in spec_inventory:
Execute @loops/authoring/spec-drafting.md with:
INPUT:
spec_summary: spec
requirements: spec.requirements
dependencies: spec.dependencies
depth: "standard"
ON section_drafted:
LOG progress
ON tbd_flagged:
RECORD for later resolution
ON spec_drafted:
IF score >= 0.7:
PROCEED to refinement
ELSE:
FLAG for manual review
\`\`\`
## Quality Checklist
Before marking spec as drafted:
- [ ] All Required sections have content (not just placeholders)
- [ ] Requirements have unique IDs (FR-001, NFR-001, etc.)
- [ ] Diagrams render correctly (valid mermaid syntax)
- [ ] Cross-references are valid (linked specs exist)
- [ ] TBD items have clear resolution conditions
- [ ] Glossary covers all domain-specific terms
`,
metadata: {
"type": "authoring",
"speed": "medium",
"scope": "document",
"description": "Draft specification documents with appropriate structure and depth.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
}
},
'exploration': {
'codebase-discovery': {
content: `# Codebase Discovery Loop
Map existing code patterns, architecture, and conventions before making changes.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: exploration
- **Speed**: medium (~1-3 minutes)
- **Scope**: session
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| focus_area | Text | yes | Area of codebase to explore (e.g., "authentication", "data layer") |
| entry_points | List<FilePath> | no | Starting files/directories to explore from |
| depth | "shallow" \\| "standard" \\| "deep" | no | Exploration depth (default: standard) |
| pattern_types | List<PatternType> | no | Specific patterns to look for |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| architecture_map | ArchitectureMap | High-level structure discovered |
| patterns | List<PatternMatch> | Patterns found with locations |
| conventions | List<Convention> | Coding conventions detected |
| dependencies | DependencyGraph | Internal/external dependency map |
| entry_points | List<EntryPoint> | Key files for the focus area |
| recommendations | List<Recommendation> | Suggested approaches based on findings |
### State
| Field | Type | Description |
|-------|------|-------------|
| explored_files | Set<FilePath> | Files already examined |
| pattern_cache | Map<Pattern, List<Location>> | Cached pattern matches |
## OODA Phases
### OBSERVE
Scan codebase for relevant structure:
\`\`\`
1. IDENTIFY entry points:
IF entry_points provided:
start_from = entry_points
ELSE:
# Use semantic search to find relevant areas
start_from = SemanticSearch(
query: f"Where is {focus_area} implemented?",
target_directories: []
)
2. MAP directory structure:
→ Glob: find all source files in relevant directories
→ Build file tree with types (source, test, config, docs)
→ Identify module boundaries
3. SCAN for patterns:
ARCHITECTURAL PATTERNS:
→ Grep: "class.*Controller|Handler|Service|Repository"
→ Grep: "export.*function|const.*=.*=>"
→ Detect: MVC, layered, hexagonal, event-driven
DEPENDENCY PATTERNS:
→ Parse imports/requires
→ Build dependency graph
→ Identify circular dependencies
NAMING CONVENTIONS:
→ File naming: kebab-case, camelCase, PascalCase
→ Function naming: verbs, prefixes (get, set, is, has)
→ Variable naming: constants, privates
ERROR HANDLING:
→ Grep: "try|catch|throw|Error|Result|Either"
→ Detect: exceptions, result types, error codes
4. EXTRACT code samples:
For each pattern found:
→ Read surrounding context (10-20 lines)
→ Note file path and line numbers
→ Classify as: core, utility, test, config
SIGNALS:
file_tree: directory structure
pattern_matches: [{pattern, location, snippet}]
import_graph: dependency relationships
naming_samples: examples of naming conventions
\`\`\`
### ORIENT
Interpret findings into actionable insights:
\`\`\`
1. BUILD architecture map:
layers = detect_layers(file_tree, import_graph)
# e.g., presentation → business → data → infrastructure
modules = detect_modules(file_tree, import_graph)
# e.g., auth, users, documents, notifications
boundaries = detect_boundaries(modules, import_graph)
# Where modules interact, API surfaces
2. CLASSIFY patterns:
For each pattern_match:
frequency = count occurrences
consistency = how consistently applied (0-1)
scope = where it applies (global, module, file)
IF frequency > threshold AND consistency > 0.8:
conventions.append(as_convention(pattern_match))
ELSE:
patterns.append(pattern_match)
3. ASSESS health indicators:
coupling_score = analyze_coupling(import_graph)
# High coupling = changes ripple widely
cohesion_score = analyze_cohesion(modules)
# Low cohesion = modules do too many things
test_coverage = estimate_coverage(file_tree)
# Ratio of test files to source files
documentation = assess_documentation(file_tree)
# README, comments, JSDoc presence
4. IDENTIFY entry points for focus_area:
entry_points = []
For each file related to focus_area:
centrality = calculate_centrality(file, import_graph)
relevance = semantic_similarity(file, focus_area)
IF centrality > threshold OR relevance > 0.8:
entry_points.append({
file: file,
reason: "high centrality" | "direct match",
suggested_action: "start here" | "reference only"
})
\`\`\`
### DECIDE
Determine recommendations and completeness:
\`\`\`
1. EVALUATE exploration completeness:
coverage = len(explored_files) / estimated_relevant_files
pattern_saturation = new_patterns_last_iteration / total_patterns
IF coverage < 0.7 AND depth != "shallow":
decision = "EXPAND"
action = explore_adjacent_areas()
ELIF pattern_saturation > 0.1 AND depth == "deep":
decision = "CONTINUE"
action = explore_edge_cases()
ELSE:
decision = "COMPLETE"
action = generate_recommendations()
2. GENERATE recommendations:
recommendations = []
# Based on conventions found
IF conventions.length > 0:
recommendations.append({
type: "follow_convention",
convention: most_relevant_convention,
rationale: "Maintains consistency with existing code"
})
# Based on architecture
IF architecture_map.layers:
recommendations.append({
type: "layer_placement",
suggested_layer: appropriate_layer_for(focus_area),
rationale: "Follows existing layered architecture"
})
# Based on patterns
IF similar_pattern_exists:
recommendations.append({
type: "extend_pattern",
pattern: existing_pattern,
rationale: "Similar functionality already implemented"
})
# Based on health
IF coupling_score > 0.7:
recommendations.append({
type: "caution",
warning: "High coupling detected",
suggestion: "Consider interface abstraction"
})
\`\`\`
### ACT
Produce discovery outputs:
\`\`\`
1. COMPILE architecture map:
architecture_map = {
layers: detected_layers,
modules: detected_modules,
boundaries: detected_boundaries,
diagram: generate_mermaid_diagram()
}
2. FORMAT pattern documentation:
For each pattern in patterns:
pattern.examples = top_3_examples
pattern.usage_guide = how_to_apply
3. EMIT discovery complete:
EMIT codebase_discovered {
focus_area: focus_area,
files_explored: len(explored_files),
patterns_found: len(patterns),
conventions_found: len(conventions),
health_score: avg(coupling_score, cohesion_score, test_coverage)
}
4. RETURN outputs:
RETURN {
architecture_map,
patterns,
conventions,
dependencies,
entry_points,
recommendations
}
\`\`\`
## Termination Conditions
- **Success**: Sufficient coverage achieved with actionable recommendations
- **Failure**: Cannot access codebase or no relevant code found
- **Timeout**: Max exploration time reached (preserve partial results)
## Composition
### Can contain (nested loops)
- None (leaf exploration loop)
### Can be contained by
- \`exploration/problem-space\` (for code-related problems)
- \`authoring/code-generation\` (pre-implementation discovery)
- \`refactoring/*\` workflows
### Parallelizable
- Conditional: Different focus areas can explore in parallel
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`area_explored\` | Directory/module scanned | \`{ area, files_count, patterns_found }\` |
| \`pattern_discovered\` | New pattern identified | \`{ pattern, frequency, examples }\` |
| \`convention_detected\` | Consistent pattern promoted | \`{ convention, consistency_score }\` |
| \`codebase_discovered\` | Exploration complete | \`{ summary_stats, health_scores }\` |
## Pattern Types
\`\`\`typescript
type PatternType =
| "architectural" // MVC, layered, hexagonal
| "structural" // Factory, singleton, adapter
| "behavioral" // Observer, strategy, command
| "error_handling" // Try/catch, Result, Either
| "testing" // Unit, integration, e2e patterns
| "naming" // Conventions for identifiers
| "file_organization" // How files/folders are structured
\`\`\`
## Example Usage
\`\`\`markdown
## Pre-Implementation Discovery
Execute @loops/exploration/codebase-discovery.md with:
INPUT:
focus_area: "user authentication"
depth: "standard"
pattern_types: ["architectural", "error_handling"]
ON pattern_discovered:
LOG "Found pattern: {pattern.name}"
ON codebase_discovered:
USE architecture_map to inform design
USE conventions to maintain consistency
USE entry_points to locate integration points
\`\`\`
## Output Example
\`\`\`yaml
architecture_map:
layers:
- name: "API"
path: "src/api/"
responsibility: "HTTP handlers, routing"
- name: "Services"
path: "src/services/"
responsibility: "Business logic"
- name: "Data"
path: "src/repositories/"
responsibility: "Database access"
patterns:
- name: "Repository Pattern"
frequency: 8
locations: ["src/repositories/*.ts"]
example: |
export class UserRepository {
async findById(id: string): Promise<User | null>
async save(user: User): Promise<User>
}
conventions:
- name: "Error Result Type"
pattern: "Result<T, E> for fallible operations"
consistency: 0.92
example: "const result = await service.process()"
recommendations:
- type: "follow_convention"
convention: "Repository Pattern"
rationale: "All data access uses repositories"
- type: "extend_pattern"
pattern: "UserRepository"
rationale: "Similar to existing auth repositories"
\`\`\`
`,
metadata: {
"type": "exploration",
"speed": "medium",
"scope": "session",
"description": "Map existing code patterns, architecture, and conventions before making changes.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'domain-research': {
content: `# Domain Research Loop
Gather external context, standards, and best practices for a domain.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: exploration
- **Speed**: slow (~2-5 minutes)
- **Scope**: session
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| domain | Text | yes | Domain or topic to research |
| questions | List<Text> | no | Specific questions to answer |
| source_types | List<SourceType> | no | Types of sources to consult |
| depth | "overview" \\| "detailed" \\| "comprehensive" | no | Research depth (default: detailed) |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| findings | List<Finding> | Key information discovered |
| standards | List<Standard> | Relevant standards and specifications |
| best_practices | List<BestPractice> | Industry best practices |
| patterns | List<DomainPattern> | Common architectural patterns |
| terminology | Map<Term, Definition> | Domain glossary |
| sources | List<Source> | All sources consulted |
| unanswered | List<Text> | Questions that couldn't be answered |
### State
| Field | Type | Description |
|-------|------|-------------|
| queries_executed | List<Query> | Searches performed |
| sources_consulted | Set<URL> | URLs already read |
| confidence_scores | Map<Finding, Score> | Confidence per finding |
## OODA Phases
### OBSERVE
Gather information from multiple source types:
\`\`\`
1. FORMULATE search queries:
base_queries = [
f"{domain} best practices",
f"{domain} architecture patterns",
f"{domain} standards specification",
f"{domain} implementation guide"
]
IF questions provided:
specific_queries = [format_as_query(q) for q in questions]
queries = base_queries + specific_queries
ELSE:
queries = base_queries
2. EXECUTE searches by source type:
IF "web" in source_types:
web_results = []
FOR each query in queries:
results = WebSearch(query, num_results=5)
web_results.extend(results)
IF "documentation" in source_types:
doc_results = []
# Search official documentation sites
doc_sites = identify_official_docs(domain)
FOR each site in doc_sites:
results = WebSearch(f"site:{site} {domain}")
doc_results.extend(results)
IF "academic" in source_types:
academic_results = []
# Search academic sources
results = WebSearch(f"{domain} research paper OR whitepaper")
academic_results.extend(results)
IF "github" in source_types:
github_results = []
# Search for reference implementations
results = WebSearch(f"site:github.com {domain} example OR reference")
github_results.extend(results)
3. SCRAPE and extract content:
FOR each result in all_results:
IF result.url not in sources_consulted:
content = Scrape(result.url)
extracted = extract_relevant_sections(content, domain)
sources_consulted.add(result.url)
raw_findings.append({
source: result.url,
content: extracted,
source_type: result.type,
query: result.query
})
4. IDENTIFY standards:
standard_patterns = [
r"RFC\\s*\\d+", # IETF RFCs
r"ISO\\s*\\d+", # ISO standards
r"W3C\\s+\\w+", # W3C specs
r"OWASP\\s+\\w+", # Security standards
r"PCI\\s*DSS", # Payment standards
r"GDPR|CCPA|HIPAA" # Compliance
]
FOR each finding in raw_findings:
FOR each pattern in standard_patterns:
matches = regex_find(pattern, finding.content)
IF matches:
standards_mentioned.extend(matches)
SIGNALS:
raw_findings: unprocessed search results
standards_mentioned: potential standards to investigate
source_coverage: types of sources consulted
\`\`\`
### ORIENT
Synthesize findings into structured knowledge:
\`\`\`
1. DEDUPLICATE and consolidate:
# Group findings by topic
topic_groups = cluster_by_similarity(raw_findings)
# Merge overlapping information
FOR each group in topic_groups:
consolidated = merge_findings(group)
consolidated.confidence = calculate_confidence(
source_count=len(group),
source_diversity=unique_source_types(group),
recency=avg_publish_date(group)
)
findings.append(consolidated)
2. EXTRACT best practices:
practice_indicators = [
"best practice",
"recommended",
"should",
"always",
"never",
"avoid",
"prefer"
]
FOR each finding in findings:
practices = extract_with_indicators(finding, practice_indicators)
FOR each practice in practices:
best_practices.append({
practice: practice.text,
source: practice.source,
rationale: practice.context,
confidence: finding.confidence
})
3. IDENTIFY patterns:
pattern_indicators = [
"pattern",
"architecture",
"approach",
"design",
"structure",
"model"
]
FOR each finding in findings:
pattern_mentions = extract_with_indicators(finding, pattern_indicators)
FOR each mention in pattern_mentions:
IF is_concrete_pattern(mention):
patterns.append({
name: extract_pattern_name(mention),
description: mention.context,
use_case: extract_use_case(mention),
trade_offs: extract_trade_offs(mention)
})
4. BUILD terminology:
# Extract definitions
definition_patterns = [
r"(\\w+)\\s+is\\s+(?:a|an|the)\\s+(.+)",
r"(\\w+):\\s+(.+)",
r"(\\w+)\\s+refers to\\s+(.+)"
]
FOR each finding in findings:
definitions = extract_definitions(finding, definition_patterns)
FOR term, definition in definitions:
IF term.lower() related_to domain:
terminology[term] = definition
5. ASSESS answer coverage:
IF questions provided:
FOR each question in questions:
answered = any(
finding answers question
for finding in findings
)
IF not answered:
unanswered.append(question)
\`\`\`
### DECIDE
Evaluate research completeness:
\`\`\`
1. CHECK coverage metrics:
source_diversity = len(unique(source_types consulted))
question_coverage = (len(questions) - len(unanswered)) / len(questions)
confidence_avg = avg(finding.confidence for finding in findings)
2. DETERMINE next action:
IF depth == "comprehensive" AND confidence_avg < 0.8:
decision = "EXPAND"
action = generate_follow_up_queries(low_confidence_findings)
ELIF len(unanswered) > 0 AND depth != "overview":
decision = "DEEP_DIVE"
action = focused_search_for(unanswered)
ELIF source_diversity < expected_for_depth:
decision = "DIVERSIFY"
action = search_missing_source_types()
ELSE:
decision = "COMPLETE"
action = finalize_outputs()
3. VALIDATE findings:
# Cross-reference between sources
FOR each finding in findings:
corroboration = count_sources_agreeing(finding)
IF corroboration < 2 AND finding.confidence < 0.7:
finding.flag = "needs_verification"
\`\`\`
### ACT
Produce research outputs:
\`\`\`
1. PRIORITIZE findings:
findings.sort(by=[
confidence descending,
relevance_to_questions descending,
recency descending
])
2. FORMAT outputs:
# Structure standards with details
FOR each standard in standards:
IF standard not already detailed:
detail = lookup_standard_details(standard)
standards_detailed.append({
identifier: standard,
full_name: detail.name,
relevance: detail.relevance_to_domain,
key_requirements: detail.requirements,
link: detail.url
})
# Structure best practices by category
best_practices_grouped = group_by_category(best_practices)
3. GENERATE summary:
research_summary = {
domain: domain,
key_findings: top_5_findings,
applicable_standards: standards_detailed,
recommended_practices: top_5_practices,
common_patterns: patterns,
terminology_count: len(terminology),
confidence_level: confidence_avg,
gaps: unanswered
}
4. EMIT completion:
EMIT research_complete {
domain: domain,
findings_count: len(findings),
standards_count: len(standards),
practices_count: len(best_practices),
coverage: question_coverage,
confidence: confidence_avg
}
5. RETURN:
RETURN {
findings,
standards: standards_detailed,
best_practices: best_practices_grouped,
patterns,
terminology,
sources,
unanswered
}
\`\`\`
## Termination Conditions
- **Success**: Sufficient coverage with acceptable confidence
- **Failure**: No relevant sources found or all searches fail
- **Timeout**: Max research time reached (preserve partial results)
## Composition
### Can contain (nested loops)
- None (leaf exploration loop)
### Can be contained by
- \`exploration/problem-space\` (for domain context)
- \`authoring/spec-drafting\` (for standards compliance)
- \`verification/fact-checking\` (for claim verification)
### Parallelizable
- Yes: Different questions can be researched in parallel
- Yes: Different source types can be searched in parallel
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`source_consulted\` | URL scraped | \`{ url, source_type, relevance }\` |
| \`finding_extracted\` | Key info found | \`{ finding, confidence, sources }\` |
| \`standard_identified\` | Standard referenced | \`{ standard, relevance }\` |
| \`research_complete\` | Research done | \`{ summary_stats }\` |
## Source Types
\`\`\`typescript
type SourceType =
| "web" // General web search
| "documentation" // Official docs
| "academic" // Papers, whitepapers
| "github" // Reference implementations
| "stackoverflow" // Community knowledge
| "rfc" // IETF standards
| "spec" // W3C, ECMA specs
\`\`\`
## Example Usage
\`\`\`markdown
## Research Phase
Execute @loops/exploration/domain-research.md with:
INPUT:
domain: "OAuth 2.0 authentication"
questions: [
"What are the security best practices for token storage?",
"How should refresh tokens be handled?",
"What are the PKCE requirements?"
]
source_types: ["documentation", "rfc", "web"]
depth: "detailed"
ON standard_identified:
ADD to spec requirements section
ON research_complete:
USE findings to inform design
USE terminology for spec glossary
INCLUDE standards in compliance section
\`\`\`
## Output Example
\`\`\`yaml
findings:
- topic: "Token Storage"
content: "Access tokens should be stored in memory, not localStorage..."
confidence: 0.95
sources: ["oauth.net", "auth0.com/docs"]
standards:
- identifier: "RFC 6749"
full_name: "The OAuth 2.0 Authorization Framework"
relevance: "Core specification"
key_requirements: ["Authorization endpoint", "Token endpoint"]
- identifier: "RFC 7636"
full_name: "PKCE for OAuth Public Clients"
relevance: "Required for SPAs and mobile apps"
best_practices:
security:
- practice: "Use PKCE for all public clients"
rationale: "Prevents authorization code interception"
- practice: "Store tokens in memory, not localStorage"
rationale: "Prevents XSS token theft"
terminology:
"Access Token": "Short-lived credential for API access"
"Refresh Token": "Long-lived credential for obtaining new access tokens"
"PKCE": "Proof Key for Code Exchange, prevents code interception"
\`\`\`
`,
metadata: {
"type": "exploration",
"speed": "slow",
"scope": "session",
"description": "Gather external context, standards, and best practices for a domain.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'problem-space': {
content: `# Problem Space Exploration Loop
Understand the problem space before committing to solution boundaries.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: exploration
- **Speed**: slow (~2-5 minutes)
- **Scope**: session
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| prompt | Text | yes | User's request or problem statement |
| codebase_context | Text | no | Summary of relevant codebase areas |
| constraints | List<Text> | no | Known constraints or requirements |
| prior_unknowns | List<Unknown> | no | Previously identified unknowns (for resume) |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| explicit_requirements | List<Text> | Requirements explicitly stated in prompt |
| implicit_requirements | List<Text> | Requirements implied but not stated |
| existing_patterns | List<PatternMatch> | Patterns found in codebase |
| domain_context | List<Text> | External context gathered |
| unknowns | List<Unknown> | Questions that need resolution |
| stakeholder_concerns | Map<Stakeholder, List<Concern>> | Concerns by stakeholder type |
| scope_estimate | ScopeEstimate | Estimated scope and complexity |
| spec_inventory | List<SpecSummary> | Specs needed (if proceeding) |
| decision | Decision | Loop outcome (clarify/decompose/research/proceed) |
### State
| Field | Type | Description |
|-------|------|-------------|
| iteration | Number | Current iteration count |
| confidence | Score | Confidence in current understanding |
| clarifications | List<QA> | User clarifications received |
## OODA Phases
### OBSERVE
Gather information about the problem space:
\`\`\`
1. PARSE prompt for explicit signals:
- Action verbs → what needs to happen (implement, add, refactor, fix)
- Nouns → entities and concepts involved
- Qualifiers → constraints (fast, secure, simple)
- Scope markers → boundaries (only, all, except)
2. SEARCH codebase for implicit context:
→ SemanticSearch: "What is the existing architecture for [topic]?"
→ SemanticSearch: "How is [concept] currently implemented?"
→ Grep: patterns matching key terms
→ Glob: related file structures
3. SEARCH external sources (if domain_research enabled):
→ WebSearch: "[topic] best practices"
→ WebSearch: "[technology] architecture patterns"
4. CATALOG unknowns:
→ What decisions haven't been made?
→ What constraints aren't specified?
→ What edge cases aren't addressed?
→ What terminology is ambiguous?
5. RECORD all signals:
explicit_requirements ← parsed requirements
implicit_requirements ← inferred requirements
existing_patterns ← code matches with fit scores
domain_context ← external knowledge
unknowns ← questions needing answers
\`\`\`
### ORIENT
Interpret gathered information through multiple lenses:
\`\`\`
1. PATTERN MATCHING:
For each existing_pattern:
→ How well does it fit the requirements?
→ What are the trade-offs if we extend it?
→ What would we need to change?
For each domain_context item:
→ Does this suggest an architecture?
→ Are there proven patterns we should use?
2. STAKEHOLDER MODELING:
AS developer:
→ What will be hard to implement?
→ What will be hard to test?
→ What will be hard to maintain?
AS user:
→ What could go wrong from their perspective?
→ What do they actually need (vs what they asked for)?
→ What would delight them?
AS operator:
→ What could break in production?
→ How will this be monitored?
→ How will this be deployed?
AS security:
→ What could be exploited?
→ Where are the trust boundaries?
→ What data is sensitive?
3. SCOPE ESTIMATION:
Count requirements → requirement_count
Assess complexity per requirement → complexity_scores
Check for dependencies → dependency_graph
scope_estimate = {
spec_count: estimated number of specs needed,
complexity: low | medium | high | very_high,
confidence: how sure are we about this estimate
}
4. UNKNOWN PRIORITIZATION:
For each unknown:
→ HIGH: Blocks all progress, must resolve first
→ MEDIUM: Affects design but can proceed with assumptions
→ LOW: Nice to know, can defer
critical_unknowns = filter(unknowns, importance == "high")
\`\`\`
### DECIDE
Choose next action based on orientation:
\`\`\`
DECISION TREE:
IF critical_unknowns.length > 0:
decision = "clarify"
action = generate_clarifying_questions(critical_unknowns)
ELIF scope_estimate.complexity == "very_high" AND scope_estimate.spec_count > MAX_SPECS:
decision = "decompose"
action = split_into_subproblems()
ELIF max(pattern.fit_score for pattern in existing_patterns) < 0.6:
decision = "research"
action = expand_search_scope()
ELIF confidence < CONFIDENCE_THRESHOLD AND iteration < MAX_ITERATIONS:
decision = "iterate"
action = refine_understanding()
ELSE:
decision = "proceed"
action = generate_spec_inventory()
CONFIDENCE FACTORS:
+ 0.1 for each clarified unknown
+ 0.1 for each high-fit pattern found
+ 0.1 for comprehensive stakeholder analysis
- 0.1 for each remaining critical unknown
- 0.1 for scope_estimate.complexity == "very_high"
\`\`\`
### ACT
Execute the decided action:
\`\`\`
IF decision == "clarify":
FORMAT clarifying questions:
"""
CLARIFICATION NEEDED:
Before proceeding, please clarify:
1. [HIGH] {unknown.question}
Options: {generated_options}
2. [MEDIUM] {unknown.question}
...
"""
WAIT for user response
UPDATE unknowns with answers
RETURN to OBSERVE
IF decision == "decompose":
SPLIT problem into sub-problems:
For each sub_problem:
SPAWN child exploration loop
child_input = {
prompt: sub_problem.description,
constraints: sub_problem.constraints,
prior_unknowns: relevant unknowns
}
MERGE child outputs into unified spec_inventory
RETURN merged results
IF decision == "research":
EXPAND search scope:
→ Search for related specifications in docs/
→ Search for similar open source projects
→ Search for architectural patterns by name
ADD new context to domain_context
RETURN to OBSERVE
IF decision == "proceed":
GENERATE spec inventory:
spec_inventory = []
For each logical unit identified:
spec = {
name: descriptive_name,
scope_summary: what this spec covers,
requirements: assigned requirements,
dependencies: other specs this depends on,
complexity: estimated complexity,
status: "pending"
}
spec_inventory.append(spec)
ORDER specs by dependency (topological sort)
EMIT signal: exploration_complete
RETURN spec_inventory
\`\`\`
## Termination Conditions
- **Success**: \`decision == "proceed"\` with \`confidence >= CONFIDENCE_THRESHOLD\`
- **Failure**: User cancels during clarification, or decomposition fails
- **Timeout**: \`iteration >= MAX_ITERATIONS\` (default 3) → force proceed with caveats
## Composition
### Can contain (nested loops)
- \`exploration/problem-space\` (for recursive decomposition)
- \`exploration/domain-research\` (for expanded context)
### Can be contained by
- Workflow commands (\`/spec-designer\`, \`/implement-spec\`)
- Orchestration loops (\`orchestration/queue-processor\`)
### Parallelizable
- Conditional: Yes for decomposed sub-problems, No for single problem
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`clarification_requested\` | Need user input | \`{ questions: List<Question> }\` |
| \`decomposition_started\` | Splitting into sub-problems | \`{ sub_problems: List<SubProblem> }\` |
| \`research_expanded\` | Expanding search scope | \`{ new_queries: List<Query> }\` |
| \`exploration_complete\` | Ready to proceed | \`{ spec_inventory, confidence, unknowns }\` |
| \`exploration_timeout\` | Max iterations reached | \`{ partial_results, caveats }\` |
## Example Usage
\`\`\`markdown
## Phase 1: Exploration
Execute @loops/exploration/problem-space.md with:
INPUT:
prompt: "Add real-time collaboration to document editor"
codebase_context: (gathered from initial scan)
CONFIG:
MAX_ITERATIONS: 3
CONFIDENCE_THRESHOLD: 0.8
MAX_SPECS: 5
ON clarification_requested:
PRESENT questions to user
FEED answers back to loop
ON exploration_complete:
PROCEED to authoring phase with spec_inventory
\`\`\`
## Quality Checklist
Before transitioning to next phase, verify:
- [ ] All HIGH-priority unknowns resolved (or documented as assumptions)
- [ ] Stakeholder concerns captured for each perspective
- [ ] Scope boundaries explicitly stated (what's IN and OUT)
- [ ] Spec inventory has clear dependencies
- [ ] Complexity estimates are justified
- [ ] No circular dependencies in spec inventory
`,
metadata: {
"type": "exploration",
"speed": "slow",
"scope": "session",
"description": "Understand the problem space before committing to solution boundaries.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
}
},
'meta': {
'composition-patterns': {
content: `# Loop Composition Patterns
How OODA loops combine to form complex workflows.
## Pattern Catalog
### 1. Sequential Composition
Loops execute in order, output of one feeds input of next.
\`\`\`
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Loop A │────▶│ Loop B │────▶│ Loop C │
└─────────┘ └─────────┘ └─────────┘
output_a ═══▶ input_b
output_b ═══▶ input_c
\`\`\`
**Use when:**
- Phases have clear handoff points
- Later phases depend on earlier outputs
- Order matters for correctness
**Example:**
\`\`\`markdown
## Workflow: Spec Design
1. Execute @loops/exploration/problem-space.md
OUTPUT → spec_inventory
2. For each spec in spec_inventory:
Execute @loops/authoring/spec-drafting.md
INPUT ← spec from inventory
OUTPUT → draft_spec
3. For each requirement in draft_spec:
Execute @loops/refinement/requirement-quality.md
INPUT ← requirement
OUTPUT → refined_requirement
\`\`\`
---
### 2. Nested Composition
Inner loop executes within each iteration of outer loop.
\`\`\`
┌─────────────────────────────────────────┐
│ Outer Loop │
│ ┌─────────────────────────────────┐ │
│ │ OBSERVE │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ ORIENT │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ DECIDE │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ ACT │ │
│ │ ┌───────────────────────────┐ │ │
│ │ │ Inner Loop │ │ │
│ │ │ O → O → D → A │ │ │
│ │ │ ↺ │ │ │
│ │ └───────────────────────────┘ │ │
│ └─────────────────────────────────┘ │
│ ↺ │
└─────────────────────────────────────────┘
\`\`\`
**Use when:**
- Each outer iteration needs detailed inner work
- Inner loop results affect outer loop state
- Different cycle speeds (slow outer, fast inner)
**Example:**
\`\`\`markdown
## Workflow: Spec Authoring with Refinement
Execute @loops/authoring/spec-drafting.md:
On each section draft:
Execute @loops/refinement/requirement-quality.md
Until section_quality_score >= 0.85
\`\`\`
---
### 3. Parallel Composition
Multiple loops execute concurrently on independent items.
\`\`\`
┌─────────┐
┌───▶│ Loop A₁ │───┐
│ └─────────┘ │
┌─────────┐ │ ┌─────────┐ │ ┌─────────┐
│ Splitter│────┼───▶│ Loop A₂ │───┼───▶│ Merger │
└─────────┘ │ └─────────┘ │ └─────────┘
│ ┌─────────┐ │
└───▶│ Loop Aₙ │───┘
└─────────┘
\`\`\`
**Use when:**
- Items are independent (no shared state)
- Order doesn't matter
- Throughput matters
**Example:**
\`\`\`markdown
## Workflow: Multi-Spec Refinement
Split specs into independent set
For each spec IN PARALLEL:
Execute @loops/refinement/requirement-quality.md
Merge refined specs into final output
\`\`\`
**Constraints:**
- Loops must be marked \`Parallelizable: yes\`
- No shared mutable state
- Results must be mergeable
---
### 4. Conditional Composition
Loop selection based on context.
\`\`\`
┌─────────┐
┌───▶│ Loop A │
│ └─────────┘
│ condition_a
┌────────┴───────┐
│ Dispatcher │
└────────┬───────┘
│ condition_b
│ ┌─────────┐
└───▶│ Loop B │
└─────────┘
\`\`\`
**Use when:**
- Different situations need different loops
- Optimization for specific cases
- Graceful degradation paths
**Example:**
\`\`\`markdown
## Workflow: Adaptive Exploration
Assess prompt complexity:
IF complexity == "low":
Execute @loops/exploration/codebase-discovery.md (fast path)
ELIF complexity == "high":
Execute @loops/exploration/problem-space.md (thorough path)
ELIF has_external_dependencies:
Execute @loops/exploration/domain-research.md (research path)
\`\`\`
---
### 5. Recursive Composition
Loop spawns child instances of itself.
\`\`\`
┌───────────────────────────────────────────────┐
│ Loop A (depth=0) │
│ │
│ DECIDE: decompose? │
│ │ │
│ ├─ yes ──▶ spawn Loop A (depth=1) │
│ │ │ │
│ │ ├─ spawn Loop A (depth=2) │
│ │ │ │
│ │ └─ spawn Loop A (depth=2) │
│ │ │
│ └─ no ───▶ process directly │
│ │
└───────────────────────────────────────────────┘
\`\`\`
**Use when:**
- Problem naturally decomposes into sub-problems
- Divide-and-conquer strategies
- Hierarchical structures
**Example:**
\`\`\`markdown
## Workflow: Recursive Problem Decomposition
Execute @loops/exploration/problem-space.md:
DECIDE phase:
IF scope_estimate.complexity == "very_high":
FOR each sub_problem in decomposition:
RECURSE with sub_problem as input
MERGE sub_problem outputs
ELSE:
PROCEED with direct processing
MAX_DEPTH: 3
\`\`\`
**Constraints:**
- Must have termination condition (max depth, base case)
- Results must be mergeable across levels
- Guard against infinite recursion
---
### 6. Pipeline with Feedback
Sequential with backward signals for correction.
\`\`\`
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Loop A │────▶│ Loop B │────▶│ Loop C │
└────▲────┘ └────▲────┘ └─────────┘
│ │
│ feedback │ feedback
│ │
└───────────────┴──────────────────────
\`\`\`
**Use when:**
- Later stages discover issues in earlier work
- Iterative refinement across phases
- Quality gates that may reject
**Example:**
\`\`\`markdown
## Workflow: Spec Design with Feedback
1. Execute @loops/exploration/problem-space.md
OUTPUT → spec_inventory
2. Execute @loops/authoring/spec-drafting.md
INPUT ← spec_inventory
OUTPUT → draft_specs
ON_SIGNAL inconsistency_detected:
FEEDBACK to exploration loop
RE-EXECUTE exploration with new context
3. Execute @loops/verification/integration-test.md
INPUT ← draft_specs
ON_SIGNAL verification_failed:
FEEDBACK to authoring loop
RE-EXECUTE authoring for failed specs
\`\`\`
---
### 7. Supervisor Pattern
Meta-loop monitors and controls child loops.
\`\`\`
┌─────────────────────────────────────────────────┐
│ Supervisor Loop │
│ │
│ OBSERVE: monitor child loop signals │
│ ORIENT: assess progress, detect spirals │
│ DECIDE: continue, intervene, terminate │
│ ACT: adjust child loop parameters │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Child A │ │ Child B │ │ Child C │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └─────────────┼─────────────┘ │
│ │ │
│ signals/heartbeats │
└─────────────────────────────────────────────────┘
\`\`\`
**Use when:**
- Need global coordination
- Resource/budget management
- Spiral/deadlock detection
**Example:**
\`\`\`markdown
## Workflow: Orchestrated Spec Implementation
Execute @loops/orchestration/queue-processor.md as SUPERVISOR:
MANAGE child loops:
- @loops/authoring/code-generation.md (per spec)
- @loops/verification/integration-test.md (per milestone)
CONSTRAINTS:
- budget_remaining > 0
- commitment_level < 5
- no spiral_detected
ON spiral_detected:
INCREMENT commitment_level
CONSTRAIN child loop options
\`\`\`
---
## Composition Visualization
### Spec Designer Composition
\`\`\`
┌──────────────────────────────────────────────────────────────────┐
│ /spec-designer │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ @loops/exploration/problem-space.md │ │
│ │ │ │
│ │ OBSERVE → ORIENT → DECIDE → ACT │ │
│ │ │ │ │
│ │ ┌────────────┼────────────┐ │ │
│ │ ▼ ▼ ▼ │ │
│ │ CLARIFY DECOMPOSE PROCEED │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ [user] [recurse] [continue] │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ FOR each spec IN spec_inventory: │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────┐ │ │
│ │ │ @loops/authoring/spec-drafting.md │ │ │
│ │ │ │ │ │
│ │ │ OBSERVE → ORIENT → DECIDE → ACT │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌──────────────────────────────────────────────┐ │ │ │
│ │ │ │ FOR each requirement: │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ @loops/refinement/requirement-quality.md │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ OBSERVE → ORIENT → DECIDE → ACT │ │ │ │
│ │ │ │ ↺ │ │ │ │
│ │ │ └──────────────────────────────────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ @loops/verification/acceptance-gate.md │ │
│ │ │ │
│ │ Validate all specs meet quality threshold │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ [OUTPUT: .specs/] │
└──────────────────────────────────────────────────────────────────┘
\`\`\`
### Spec Orchestrator Composition
\`\`\`
┌──────────────────────────────────────────────────────────────────┐
│ /spec-orchestrator │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ SUPERVISOR: @loops/orchestration/queue-processor.md │ │
│ │ │ │
│ │ Monitors: budget, commitment_level, spiral_detection │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────┐ │ │
│ │ │ @loops/orchestration/dependency-resolver.md │ │ │
│ │ │ │ │ │
│ │ │ Topological sort → implementation_queue │ │ │
│ │ └──────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────────────────────┐ │ │
│ │ │ FOR each spec IN implementation_queue: │ │ │
│ │ │ │ │ │
│ │ │ ┌────────────────────────────────────────────────┐ │ │ │
│ │ │ │ @loops/authoring/code-generation.md │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ NESTED: @loops/orchestration/spiral-detector │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ ON spiral_detected → escalate to supervisor │ │ │ │
│ │ │ └────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌────────────────────────────────────────────────┐ │ │ │
│ │ │ │ @loops/verification/integration-test.md │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ ON failure → feedback to code-generation │ │ │ │
│ │ │ └────────────────────────────────────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
\`\`\`
## Anti-Patterns
### ❌ Circular Dependencies
\`\`\`
Loop A → Loop B → Loop A (infinite loop risk)
\`\`\`
**Fix:** Use feedback signals instead of direct calls
### ❌ State Leakage
\`\`\`
Loop A modifies global state → Loop B reads unexpectedly
\`\`\`
**Fix:** Explicit input/output contracts only
### ❌ Unbounded Recursion
\`\`\`
Loop A spawns Loop A without depth limit
\`\`\`
**Fix:** Always include MAX_DEPTH constraint
### ❌ Parallel State Mutation
\`\`\`
Loop A₁ and Loop A₂ both write to shared state
\`\`\`
**Fix:** Use merge function, not shared state
`,
metadata: {
"type": "meta",
"speed": "reference",
"scope": "meta",
"description": "How OODA loops combine to form complex workflows.",
"interface_version": "1",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'loop-interface': {
content: `# Loop Interface Specification
Standard contract for OODA loop building blocks.
## Interface Contract
Every loop building block MUST define the following sections:
\`\`\`markdown
# [Loop Name]
[One-line description of loop purpose]
## Classification
- **Type**: exploration | authoring | refinement | verification | orchestration
- **Speed**: slow (~minutes) | medium (~30s-2min) | fast (~5-15s)
- **Scope**: session | document | item | line
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| input_name | type | yes/no | What this input provides |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| output_name | type | What this output contains |
### State (if stateful)
| Field | Type | Description |
|-------|------|-------------|
| state_field | type | What this state tracks |
## OODA Phases
### OBSERVE
[What information is gathered]
### ORIENT
[How information is interpreted/patterned]
### DECIDE
[Decision tree or logic for choosing action]
### ACT
[Actions taken and their effects]
## Termination Conditions
- **Success**: [When loop exits successfully]
- **Failure**: [When loop exits with failure]
- **Timeout**: [Maximum iterations or time]
## Composition
### Can contain (nested loops)
- [list of loop types this can contain]
### Can be contained by
- [list of loop types that can contain this]
### Parallelizable
- yes | no | conditional (explain)
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| signal_name | trigger condition | data structure |
\`\`\`
## Interface Semantics
### Input/Output Types
Standard types for loop interfaces:
\`\`\`typescript
// Primitives
type Text = string
type Number = number
type Boolean = boolean
type Score = number // 0.0 to 1.0
// Collections
type List<T> = T[]
type Map<K, V> = Record<K, V>
type Set<T> = T[] // unique items
// Domain types
type Requirement = {
id: string
text: string
priority: "must" | "should" | "could"
acceptance_criteria: string[]
}
type Spec = {
name: string
path: string
requirements: Requirement[]
status: "draft" | "review" | "approved"
}
type Unknown = {
question: string
importance: "high" | "medium" | "low"
resolved: boolean
answer?: string
}
type PatternMatch = {
pattern: string
location: string
fit_score: Score
trade_offs: string[]
}
type StakeholderConcern = {
stakeholder: "developer" | "user" | "operator" | "security"
concern: string
severity: "high" | "medium" | "low"
}
\`\`\`
### State Management
Loops can be:
1. **Stateless**: Pure function from inputs to outputs
2. **Session-stateful**: State persists across iterations within a session
3. **Persistent-stateful**: State persists across sessions (via state files)
State storage convention:
\`\`\`
.loop-state/
└── [workflow-name]/
└── [loop-name]/
├── state.json # Current state
└── history.jsonl # State history (append-only)
\`\`\`
### Signal Protocol
Loops communicate via signals:
\`\`\`typescript
type Signal = {
source: string // Loop that emitted
type: string // Signal type
timestamp: string // ISO8601
payload: unknown // Signal-specific data
}
// Standard signals
type IterationComplete = Signal & {
type: "iteration_complete"
payload: { iteration: number, progress: Score }
}
type DecisionMade = Signal & {
type: "decision_made"
payload: { decision: string, confidence: Score, rationale: string }
}
type Escalation = Signal & {
type: "escalation"
payload: { reason: string, context: unknown }
}
type LoopTerminated = Signal & {
type: "loop_terminated"
payload: { reason: "success" | "failure" | "timeout", outputs: unknown }
}
\`\`\`
## Composition Rules
### Nesting Depth
Maximum recommended nesting: 3 levels
\`\`\`
Outer Loop (orchestration)
└── Middle Loop (authoring)
└── Inner Loop (refinement)
\`\`\`
### Data Flow
Data flows DOWN (parent → child) via inputs
Data flows UP (child → parent) via outputs and signals
\`\`\`
Parent Loop
│
├── INPUT ──────► Child Loop
│ │
│ ▼
◄── OUTPUT/SIGNAL ────┘
\`\`\`
### Lifecycle Hooks
Loops MAY implement lifecycle hooks:
\`\`\`markdown
## Lifecycle Hooks
### on_enter
[Called when loop starts]
### on_iteration_start
[Called at start of each iteration]
### on_iteration_end
[Called at end of each iteration]
### on_exit
[Called when loop terminates]
### on_error
[Called on unhandled error]
\`\`\`
## Versioning
Loop interfaces are versioned using semver:
- **Major**: Breaking interface changes
- **Minor**: New optional inputs/outputs
- **Patch**: Documentation, bug fixes
Version declared in header:
\`\`\`markdown
# Loop Name
**Version**: 1.2.0
**Interface**: loop-interface@1.0
\`\`\`
## Validation
Workflows can validate loop usage:
\`\`\`markdown
## Validation Rules
- All required inputs must be provided
- Output types must match declared types
- Nested loops must be in "can contain" list
- Termination conditions must be satisfiable
\`\`\`
`,
metadata: {
"type": "meta",
"speed": "reference",
"scope": "meta",
"description": "Standard contract for OODA loop building blocks.",
"interface_version": "1",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
}
},
'orchestration': {
'dependency-resolver': {
content: `# Dependency Resolver Loop
Analyze dependencies and determine optimal processing order.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: orchestration
- **Speed**: fast (~5-30s)
- **Scope**: collection
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| items | List<Item> | yes | Items with potential dependencies |
| dependency_types | List<DepType> | no | Types of dependencies to analyze |
| resolution_strategy | Strategy | no | How to handle cycles/conflicts |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| ordered_items | List<Item> | Topologically sorted items |
| dependency_graph | Graph | Full dependency graph |
| cycles | List<Cycle> | Detected circular dependencies |
| parallel_groups | List<Group> | Items that can run in parallel |
| critical_path | List<Item> | Longest dependency chain |
| resolutions | List<Resolution> | Applied cycle resolutions |
### State
| Field | Type | Description |
|-------|------|-------------|
| graph | Graph | Dependency graph being built |
| visited | Set<ItemId> | Items already processed |
## Types
\`\`\`typescript
type DepType =
| "explicit" // Declared dependencies
| "implicit" // Inferred from content
| "data_flow" // Data dependencies
| "temporal" // Time-based ordering
type Strategy =
| "fail_on_cycle" // Error if cycles detected
| "break_weakest" // Break cycle at weakest link
| "merge_cycle" // Merge cyclic items
| "user_resolution" // Ask user to resolve
type Cycle = {
items: List<ItemId>
edges: List<Edge>
strength: number // How tightly coupled
}
type Graph = {
nodes: Map<ItemId, Node>
edges: List<Edge>
add_node(item: Item): void
add_edge(from: ItemId, to: ItemId, type: DepType): void
get_dependencies(id: ItemId): List<ItemId>
get_dependents(id: ItemId): List<ItemId>
}
\`\`\`
## OODA Phases
### OBSERVE
Build dependency graph:
\`\`\`
1. EXTRACT explicit dependencies:
FOR item in items:
graph.add_node(item)
# Parse declared dependencies
IF item.dependencies:
FOR dep in item.dependencies:
target = find_item(dep, items)
IF target:
graph.add_edge(
from: item.id,
to: target.id,
type: "explicit",
strength: 1.0
)
2. INFER implicit dependencies:
IF "implicit" in dependency_types:
FOR item in items:
# Analyze content for references
references = extract_references(item.content)
FOR ref in references:
target = resolve_reference(ref, items)
IF target AND target.id != item.id:
# Check if edge already exists
IF not graph.has_edge(item.id, target.id):
graph.add_edge(
from: item.id,
to: target.id,
type: "implicit",
strength: 0.5 # Lower weight for inferred
)
3. ANALYZE data flow:
IF "data_flow" in dependency_types:
FOR item in items:
# What data does this item produce?
outputs = extract_outputs(item)
# What data does this item consume?
inputs = extract_inputs(item)
FOR input in inputs:
# Find producer
producer = find_producer(input, items)
IF producer:
graph.add_edge(
from: item.id,
to: producer.id,
type: "data_flow",
data: input
)
4. CHECK temporal constraints:
IF "temporal" in dependency_types:
FOR item in items:
# Parse temporal markers
temporal = extract_temporal(item)
# "after X", "before Y", "with Z"
FOR constraint in temporal:
target = find_item(constraint.reference, items)
IF target:
IF constraint.type == "after":
graph.add_edge(item.id, target.id, type: "temporal")
ELIF constraint.type == "before":
graph.add_edge(target.id, item.id, type: "temporal")
SIGNALS:
graph: complete dependency graph
edge_count: number of dependencies found
edge_types: breakdown by type
\`\`\`
### ORIENT
Analyze graph structure:
\`\`\`
1. DETECT cycles:
cycles = []
# Tarjan's algorithm for strongly connected components
sccs = tarjan_scc(graph)
FOR scc in sccs:
IF len(scc) > 1:
# This is a cycle
cycle_edges = get_internal_edges(scc, graph)
cycle_strength = avg(e.strength for e in cycle_edges)
cycles.append({
items: scc,
edges: cycle_edges,
strength: cycle_strength
})
2. CALCULATE critical path:
IF len(cycles) == 0:
# Standard critical path calculation
critical_path = find_longest_path(graph)
ELSE:
# Approximate with cycles present
critical_path = find_longest_path_with_cycles(graph, cycles)
3. IDENTIFY parallel groups:
# Items with no dependencies between them
parallel_groups = []
# Get items at each depth level
levels = calculate_depth_levels(graph)
FOR level in levels:
level_items = items_at_level(level)
# Group items that can truly run in parallel
independent = find_independent_subsets(level_items, graph)
FOR group in independent:
IF len(group) > 1:
parallel_groups.append({
items: group,
level: level
})
4. ASSESS graph health:
metrics = {
node_count: len(graph.nodes),
edge_count: len(graph.edges),
density: edge_count / (node_count * (node_count - 1)),
max_depth: len(levels),
parallelism_potential: sum(len(g.items) for g in parallel_groups),
cycle_count: len(cycles)
}
\`\`\`
### DECIDE
Choose resolution strategy for cycles:
\`\`\`
1. IF len(cycles) == 0:
decision = "PROCEED"
rationale = "No cycles detected"
2. ELIF resolution_strategy == "fail_on_cycle":
decision = "FAIL"
rationale = f"Detected {len(cycles)} cycles"
3. ELIF resolution_strategy == "break_weakest":
decision = "BREAK_CYCLES"
FOR cycle in cycles:
# Find weakest edge to break
weakest = min(cycle.edges, by=strength)
cycle.resolution = {
action: "remove_edge",
edge: weakest,
rationale: "Weakest link in cycle"
}
4. ELIF resolution_strategy == "merge_cycle":
decision = "MERGE_CYCLES"
FOR cycle in cycles:
cycle.resolution = {
action: "merge",
new_item: merge_items(cycle.items),
rationale: "Tightly coupled items merged"
}
5. ELIF resolution_strategy == "user_resolution":
decision = "ESCALATE"
FOR cycle in cycles:
cycle.resolution = {
action: "user_decision",
options: [
f"Break at {e.from} -> {e.to}" for e in cycle.edges
] + ["Merge all items"]
}
\`\`\`
### ACT
Apply resolutions and generate ordering:
\`\`\`
1. APPLY cycle resolutions:
IF decision in ["BREAK_CYCLES", "MERGE_CYCLES"]:
FOR cycle in cycles:
IF cycle.resolution.action == "remove_edge":
graph.remove_edge(cycle.resolution.edge)
resolutions.append({
cycle: cycle.items,
action: "edge_removed",
edge: cycle.resolution.edge
})
ELIF cycle.resolution.action == "merge":
merged = merge_items(cycle.items)
# Replace cycle items with merged item
FOR item in cycle.items:
graph.remove_node(item)
graph.add_node(merged)
# Redirect edges
FOR item in cycle.items:
FOR dep in graph.get_dependencies(item):
IF dep not in cycle.items:
graph.add_edge(merged.id, dep)
FOR dependent in graph.get_dependents(item):
IF dependent not in cycle.items:
graph.add_edge(dependent, merged.id)
resolutions.append({
cycle: cycle.items,
action: "merged",
result: merged.id
})
# Update items list
items = [i for i in items if i.id not in cycle.items]
items.append(merged)
2. ELIF decision == "ESCALATE":
EMIT resolution_needed {
cycles: cycles,
options: [c.resolution.options for c in cycles]
}
# Wait for user decision
user_decisions = WAIT_FOR user_input
FOR cycle, user_decision in zip(cycles, user_decisions):
apply_user_resolution(cycle, user_decision)
3. GENERATE topological order:
IF decision == "FAIL":
ordered_items = None
EMIT resolution_failed {
cycles: cycles,
message: "Cannot order items with circular dependencies"
}
ELSE:
# Kahn's algorithm for topological sort
ordered_items = []
in_degree = {node: len(graph.get_dependencies(node)) for node in graph.nodes}
queue = [node for node, degree in in_degree.items() if degree == 0]
WHILE queue:
node = queue.pop(0)
ordered_items.append(find_item(node, items))
FOR dependent in graph.get_dependents(node):
in_degree[dependent] -= 1
IF in_degree[dependent] == 0:
queue.append(dependent)
4. EMIT completion:
IF ordered_items:
EMIT dependencies_resolved {
items: len(ordered_items),
cycles_resolved: len(resolutions),
parallel_opportunities: len(parallel_groups),
critical_path_length: len(critical_path)
}
5. RETURN:
RETURN {
ordered_items: ordered_items,
dependency_graph: graph,
cycles: cycles,
parallel_groups: parallel_groups,
critical_path: critical_path,
resolutions: resolutions
}
\`\`\`
## Termination Conditions
- **Success**: All dependencies resolved, topological order produced
- **Failure**: Unresolvable cycles with \`fail_on_cycle\` strategy
- **Timeout**: N/A (fast algorithm)
## Composition
### Can contain (nested loops)
- None (computational loop)
### Can be contained by
- \`orchestration/queue-processor\` (before processing)
- Workflow commands
### Parallelizable
- No (graph algorithms are inherently sequential)
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`cycle_detected\` | Cycle found | \`{ cycle_items, strength }\` |
| \`resolution_needed\` | User input required | \`{ cycles, options }\` |
| \`resolution_applied\` | Cycle resolved | \`{ cycle, action }\` |
| \`resolution_failed\` | Cannot resolve | \`{ cycles, message }\` |
| \`dependencies_resolved\` | Complete | \`{ items, parallel, critical_path }\` |
## Graph Algorithms Used
| Algorithm | Purpose | Complexity |
|-----------|---------|------------|
| Tarjan's SCC | Cycle detection | O(V + E) |
| Kahn's Algorithm | Topological sort | O(V + E) |
| Longest Path | Critical path | O(V + E) |
| DFS | Dependency traversal | O(V + E) |
## Example Usage
\`\`\`markdown
## Resolve Spec Dependencies
Execute @loops/orchestration/dependency-resolver.md with:
INPUT:
items: spec_inventory
dependency_types: ["explicit", "implicit"]
resolution_strategy: "break_weakest"
ON cycle_detected:
LOG "Found cycle: {cycle_items}"
ON resolution_applied:
LOG "Resolved by: {action}"
ON dependencies_resolved:
USE ordered_items as implementation_queue
USE parallel_groups for concurrent execution
MONITOR critical_path for bottlenecks
\`\`\`
## Output Example
\`\`\`yaml
ordered_items:
- id: "spec-auth"
order: 1
can_start: immediately
- id: "spec-users"
order: 2
depends_on: ["spec-auth"]
- id: "spec-notifications"
order: 2 # Same level, can parallel
depends_on: ["spec-auth"]
- id: "spec-dashboard"
order: 3
depends_on: ["spec-users", "spec-notifications"]
parallel_groups:
- level: 2
items: ["spec-users", "spec-notifications"]
critical_path:
- "spec-auth" -> "spec-users" -> "spec-dashboard"
length: 3
cycles: [] # None detected
resolutions: [] # None needed
\`\`\`
`,
metadata: {
"type": "orchestration",
"speed": "fast",
"scope": "collection",
"description": "Analyze dependencies and determine optimal processing order.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'queue-processor': {
content: `# Queue Processor Loop
Process work items in priority order with resource management and spiral prevention.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: orchestration
- **Speed**: varies (supervises child loops)
- **Scope**: session (manages queue across time)
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| queue | List<WorkItem> | yes | Work items to process |
| budget | Budget | no | Resource constraints |
| child_loop | LoopRef | yes | Loop to execute for each item |
| child_config | Config | no | Configuration for child loop |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| completed | List<WorkItem> | Successfully processed items |
| partial | List<WorkItem> | Partially completed items |
| skipped | List<WorkItem> | Items skipped due to constraints |
| failed | List<WorkItem> | Items that failed |
| budget_used | Budget | Resources consumed |
| final_report | Report | Summary of processing |
### State
| Field | Type | Description |
|-------|------|-------------|
| current_item | WorkItem | Item being processed |
| commitment_level | Number | Constraint escalation (0-5) |
| iteration_history | List<IterationRecord> | Per-item iteration history |
| budget_remaining | Budget | Remaining resources |
## Types
\`\`\`typescript
type WorkItem = {
id: string
name: string
priority: number
dependencies: List<string> // IDs of items this depends on
status: "pending" | "ready" | "in_progress" | "completed" | "partial" | "failed" | "skipped"
allocated_budget: number
metadata: any
}
type Budget = {
total_units: number
time_limit_ms?: number
max_iterations_per_item: number
}
type Report = {
items_processed: number
success_rate: number
budget_efficiency: number
bottlenecks: List<string>
recommendations: List<string>
}
\`\`\`
## OODA Phases
### OBSERVE
Monitor queue and resource state:
\`\`\`
1. ASSESS queue status:
pending = filter(queue, status == "pending")
ready = filter(queue, status == "ready")
blocked = filter(queue,
status == "pending" AND
has_unmet_dependencies
)
in_progress = filter(queue, status == "in_progress")
2. CHECK resource constraints:
budget_remaining = budget.total_units - sum(
item.budget_used for item in queue
if item.status in ["completed", "partial", "failed"]
)
time_remaining = budget.time_limit_ms - elapsed_time()
3. UPDATE dependency status:
FOR item in blocked:
deps = get_dependencies(item, queue)
unmet = filter(deps, status not in ["completed"])
IF len(unmet) == 0:
item.status = "ready"
ready.append(item)
4. COLLECT signals from current processing:
IF current_item:
current_signals = {
progress: current_item.progress,
iterations: current_item.iteration_count,
budget_used: current_item.budget_used,
issues: current_item.issues
}
SIGNALS:
queue_status: {pending, ready, blocked, in_progress}
resources: {budget_remaining, time_remaining}
current_progress: signals from active item
\`\`\`
### ORIENT
Analyze queue dynamics and constraints:
\`\`\`
1. PRIORITIZE ready items:
# Sort by priority, then by dependency count (items that unblock others)
ready.sort(by=[
priority descending,
unblocks_count descending,
allocated_budget ascending # Smaller items first when equal
])
2. DETECT bottlenecks:
bottlenecks = []
# Circular dependencies
cycles = detect_cycles(queue)
IF cycles:
bottlenecks.append({
type: "circular_dependency",
items: cycles
})
# Resource bottlenecks
IF budget_remaining < min(item.allocated_budget for item in ready):
bottlenecks.append({
type: "budget_exhausted",
remaining: budget_remaining,
needed: min_budget_needed
})
# Single item blocking many
FOR item in blocked:
blockers = get_blockers(item)
IF len(blocks_many(blockers)) > 3:
bottlenecks.append({
type: "critical_path",
blocker: blockers[0],
blocked_count: len(get_blocked_by(blockers[0]))
})
3. ASSESS commitment level triggers:
# Budget depletion warnings
IF budget_remaining < budget.total_units * 0.5:
commitment_level = max(commitment_level, 1)
IF budget_remaining < budget.total_units * 0.25:
commitment_level = max(commitment_level, 2)
# Time pressure
IF time_remaining < time_limit * 0.25:
commitment_level = max(commitment_level, 3)
# Spiral detection from history
IF detect_queue_spiral(iteration_history):
commitment_level = max(commitment_level, 4)
4. DETERMINE queue health:
health_score = calculate_health(
completion_rate: len(completed) / len(queue),
resource_efficiency: budget_used_effectively / budget_used_total,
throughput: items_per_hour,
spiral_risk: spiral_indicators
)
\`\`\`
### DECIDE
Choose next action:
\`\`\`
1. SELECT next item:
IF commitment_level >= 5:
decision = "FORCE_COMPLETE_ALL"
rationale = "Maximum commitment reached"
ELIF len(ready) == 0 AND len(blocked) > 0:
IF has_resolvable_blocks:
decision = "RESOLVE_BLOCKS"
rationale = "Unblock dependent items"
ELSE:
decision = "ESCALATE"
rationale = "Unresolvable dependencies"
ELIF len(ready) == 0 AND len(pending) == 0:
decision = "COMPLETE"
rationale = "Queue empty"
ELIF budget_remaining <= 0:
decision = "BUDGET_EXHAUSTED"
rationale = "No budget remaining"
ELSE:
decision = "PROCESS_NEXT"
next_item = ready[0]
2. APPLY commitment constraints:
IF commitment_level >= 2:
# Hard budget constraint
IF next_item.allocated_budget > budget_remaining:
next_item.allocated_budget = budget_remaining
IF commitment_level >= 3:
# Only incomplete items, reduced iterations
child_config.max_iterations = 1
IF commitment_level >= 4:
# Bug fixes only mode
child_config.mode = "minimal"
\`\`\`
### ACT
Execute processing:
\`\`\`
1. IF decision == "PROCESS_NEXT":
current_item = next_item
current_item.status = "in_progress"
EMIT item_started {
item: current_item.name,
priority: current_item.priority,
budget_allocated: current_item.allocated_budget
}
# Execute child loop for this item
result = Execute child_loop with:
INPUT: current_item.metadata
CONFIG: child_config
ON iteration_complete:
iteration_history.append({
item: current_item.id,
iteration: result.iteration,
progress: result.progress,
budget_used: result.budget_used
})
# Check for spirals
spiral_check = Execute @loops/orchestration/spiral-detector.md with:
iteration_history: item_iterations
current_iteration: latest
IF spiral_check.severity == "critical":
EMIT spiral_detected { item: current_item.name }
IF commitment_level < 5:
commitment_level += spiral_check.commitment_delta
# Process result
IF result.success:
current_item.status = "completed"
completed.append(current_item)
EMIT item_completed {
item: current_item.name,
budget_used: result.budget_used
}
ELIF result.partial:
current_item.status = "partial"
partial.append(current_item)
EMIT item_partial {
item: current_item.name,
completion: result.completion_percentage
}
ELSE:
current_item.status = "failed"
failed.append(current_item)
EMIT item_failed {
item: current_item.name,
error: result.error
}
# Update budget
budget_remaining -= result.budget_used
current_item = None
# Continue to next item
RETURN to OBSERVE
2. IF decision == "RESOLVE_BLOCKS":
# Try to resolve circular dependencies
FOR cycle in bottlenecks.circular_dependencies:
resolution = attempt_cycle_resolution(cycle)
IF resolution:
apply_resolution(resolution)
RETURN to OBSERVE
3. IF decision == "FORCE_COMPLETE_ALL":
# Accept current state of all items
FOR item in queue:
IF item.status == "in_progress":
item.status = "partial"
partial.append(item)
ELIF item.status in ["pending", "ready"]:
item.status = "skipped"
skipped.append(item)
EMIT force_complete {
reason: "commitment_level_5",
partial: len(partial),
skipped: len(skipped)
}
PROCEED to finalization
4. IF decision == "BUDGET_EXHAUSTED":
# Mark remaining as skipped
FOR item in ready + pending:
item.status = "skipped"
skipped.append(item)
EMIT budget_exhausted {
completed: len(completed),
skipped: len(skipped)
}
PROCEED to finalization
5. IF decision == "COMPLETE":
# All items processed
PROCEED to finalization
6. FINALIZATION:
final_report = {
items_processed: len(completed) + len(partial),
success_rate: len(completed) / len(queue),
budget_efficiency: value_delivered / budget_used,
bottlenecks: [b.type for b in bottlenecks],
recommendations: generate_recommendations(
bottlenecks,
failed,
iteration_history
)
}
EMIT queue_complete {
completed: len(completed),
partial: len(partial),
failed: len(failed),
skipped: len(skipped)
}
RETURN {
completed, partial, skipped, failed,
budget_used: budget.total_units - budget_remaining,
final_report
}
\`\`\`
## Termination Conditions
- **Success**: All items completed or acceptable partial completion
- **Failure**: Critical items failed with no recovery
- **Timeout**: \`commitment_level >= 5\` triggers force complete
## Composition
### Can contain (nested loops)
- Any child loop specified in \`child_loop\` input
- \`orchestration/spiral-detector\` (inline monitoring)
### Can be contained by
- Workflow commands (\`/spec-orchestrator\`)
- Meta-orchestration loops
### Parallelizable
- Conditional: Independent items can be processed in parallel
- No: Items with dependencies must be sequential
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`item_started\` | Processing begins | \`{ item, priority, budget }\` |
| \`item_completed\` | Item succeeds | \`{ item, budget_used }\` |
| \`item_partial\` | Partial completion | \`{ item, completion }\` |
| \`item_failed\` | Item fails | \`{ item, error }\` |
| \`spiral_detected\` | Spiral in item | \`{ item, pattern }\` |
| \`budget_warning\` | Budget threshold | \`{ remaining, threshold }\` |
| \`budget_exhausted\` | No budget left | \`{ completed, skipped }\` |
| \`force_complete\` | Level 5 triggered | \`{ reason, partial, skipped }\` |
| \`queue_complete\` | All done | \`{ summary_stats }\` |
## Commitment Levels
| Level | Trigger | Effect |
|-------|---------|--------|
| 0 | Default | Full flexibility |
| 1 | 50% budget used | Soft warnings |
| 2 | 75% budget used | Hard budget constraint |
| 3 | 75% time used | Reduced iterations |
| 4 | Spiral detected | Minimal mode |
| 5 | Multiple triggers | Force complete all |
## Example Usage
\`\`\`markdown
## Spec Implementation Queue
Execute @loops/orchestration/queue-processor.md with:
INPUT:
queue: spec_inventory (from spec-designer)
budget: { total_units: 100, max_iterations_per_item: 3 }
child_loop: @loops/authoring/code-generation.md
child_config: { depth: "standard" }
ON item_started:
LOG "Starting: {item}"
ON item_completed:
UNBLOCK dependent items
RUN integration tests
ON spiral_detected:
LOG "Spiral in {item}, constraining..."
ON budget_warning:
NOTIFY user of budget status
ON queue_complete:
GENERATE final report
PROCEED to integration phase
\`\`\`
`,
metadata: {
"type": "orchestration",
"speed": "varies",
"scope": "session",
"description": "Process work items in priority order with resource management and spiral prevention.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'spiral-detector': {
content: `# Spiral Detection Loop
Detect and prevent implementation spirals through pattern recognition.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: orchestration
- **Speed**: fast (~1-5s per check)
- **Scope**: session (monitors across iterations)
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| iteration_history | List<IterationRecord> | yes | History of past iterations |
| current_iteration | IterationRecord | yes | Current iteration data |
| thresholds | SpiralThresholds | no | Detection thresholds |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| spirals_detected | List<SpiralPattern> | Detected spiral patterns |
| severity | "none" \\| "warning" \\| "critical" | Overall severity |
| recommendation | Recommendation | Suggested action |
| commitment_delta | Number | Suggested commitment level change |
### State
| Field | Type | Description |
|-------|------|-------------|
| pattern_counts | Map<Pattern, Count> | How many times each pattern seen |
| warning_count | Number | Cumulative warnings issued |
## Types
\`\`\`typescript
type IterationRecord = {
iteration: number
files_modified: Set<string>
checklist_score: number // 0-1
time_spent_ms: number
scope_baseline: Set<string> // files that SHOULD be modified
}
type SpiralPattern =
| "OSCILLATION" // Same files touched repeatedly
| "SCOPE_CREEP" // Modifying files outside scope
| "DIMINISHING_RETURNS" // Low progress per iteration
| "THRASHING" // High effort, zero/negative progress
| "GOLD_PLATING" // Perfecting already-complete work
type SpiralThresholds = {
oscillation_file_count: number // default: 3
oscillation_iteration_span: number // default: 3
scope_creep_tolerance: number // default: 0 extra files
diminishing_returns_delta: number // default: 0.1 (10%)
diminishing_returns_span: number // default: 2 iterations
thrashing_time_multiplier: number // default: 2x
}
type Recommendation = {
action: "continue" | "constrain" | "escalate" | "force_complete"
rationale: string
constraints?: string[]
}
\`\`\`
## OODA Phases
### OBSERVE
Collect signals from iteration history:
\`\`\`
GIVEN: iteration_history, current_iteration
1. EXTRACT file modification patterns:
files_per_iteration = [
iter.files_modified for iter in iteration_history + [current_iteration]
]
file_frequency = count occurrences of each file across iterations
2. EXTRACT progress patterns:
progress_deltas = [
iteration_history[i+1].checklist_score - iteration_history[i].checklist_score
for i in range(len(iteration_history) - 1)
]
current_delta = current_iteration.checklist_score - iteration_history[-1].checklist_score
3. EXTRACT time patterns:
time_per_iteration = [iter.time_spent_ms for iter in iteration_history]
avg_time = mean(time_per_iteration)
current_time = current_iteration.time_spent_ms
4. EXTRACT scope patterns:
scope_baseline = current_iteration.scope_baseline
actual_files = current_iteration.files_modified
out_of_scope = actual_files - scope_baseline
SIGNALS:
file_frequency: Map<file, count>
progress_deltas: List<number>
current_delta: number
avg_time: number
current_time: number
out_of_scope_files: Set<string>
\`\`\`
### ORIENT
Apply pattern detection heuristics:
\`\`\`
spirals_detected = []
1. OSCILLATION detection:
"""
Trigger: Same 3+ files modified in iterations N, N-1, N-2
Meaning: Flip-flopping between states, not converging
"""
IF len(iteration_history) >= thresholds.oscillation_iteration_span:
recent_iterations = last N iterations
common_files = intersection(
iter.files_modified for iter in recent_iterations
)
IF len(common_files) >= thresholds.oscillation_file_count:
spirals_detected.append({
pattern: "OSCILLATION",
evidence: {files: common_files, iterations: recent_iterations},
severity: "warning" if first_time else "critical"
})
2. SCOPE_CREEP detection:
"""
Trigger: Modifying files not in scope_baseline
Meaning: Expanding beyond intended changes
"""
IF len(out_of_scope_files) > thresholds.scope_creep_tolerance:
spirals_detected.append({
pattern: "SCOPE_CREEP",
evidence: {files: out_of_scope_files},
severity: "warning"
})
3. DIMINISHING_RETURNS detection:
"""
Trigger: <10% progress in each of last 2 iterations
Meaning: Stuck, not making meaningful progress
"""
IF len(progress_deltas) >= thresholds.diminishing_returns_span:
recent_deltas = progress_deltas[-thresholds.diminishing_returns_span:]
IF all(delta < thresholds.diminishing_returns_delta for delta in recent_deltas):
spirals_detected.append({
pattern: "DIMINISHING_RETURNS",
evidence: {deltas: recent_deltas, threshold: thresholds.diminishing_returns_delta},
severity: "warning" if checklist_score < 0.8 else "critical"
})
4. THRASHING detection:
"""
Trigger: 2x time spent with zero/negative progress
Meaning: Spinning wheels, making things worse
"""
IF current_time > (avg_time * thresholds.thrashing_time_multiplier):
IF current_delta <= 0:
spirals_detected.append({
pattern: "THRASHING",
evidence: {
current_time: current_time,
avg_time: avg_time,
progress: current_delta
},
severity: "critical"
})
5. GOLD_PLATING detection:
"""
Trigger: Checklist at 100% but still making changes
Meaning: Over-engineering, perfectionism
"""
IF current_iteration.checklist_score >= 1.0:
IF len(current_iteration.files_modified) > 0:
spirals_detected.append({
pattern: "GOLD_PLATING",
evidence: {
score: current_iteration.checklist_score,
files_touched: current_iteration.files_modified
},
severity: "warning"
})
6. CALCULATE overall severity:
IF any(s.severity == "critical" for s in spirals_detected):
severity = "critical"
ELIF len(spirals_detected) > 0:
severity = "warning"
ELSE:
severity = "none"
\`\`\`
### DECIDE
Determine recommended action:
\`\`\`
DECISION MATRIX:
severity == "none":
recommendation = {
action: "continue",
rationale: "No spiral patterns detected"
}
commitment_delta = 0
severity == "warning" AND warning_count < 2:
recommendation = {
action: "constrain",
rationale: f"Detected: {[s.pattern for s in spirals_detected]}",
constraints: generate_constraints(spirals_detected)
}
commitment_delta = +1
warning_count += 1
severity == "warning" AND warning_count >= 2:
recommendation = {
action: "escalate",
rationale: "Multiple warnings, user decision needed"
}
commitment_delta = +1
severity == "critical":
recommendation = {
action: "escalate",
rationale: f"Critical spiral: {[s.pattern for s in spirals_detected]}"
}
commitment_delta = +2
CONSTRAINT GENERATION:
IF "OSCILLATION" detected:
constraints.append("Lock files that have oscillated; no further changes allowed")
IF "SCOPE_CREEP" detected:
constraints.append(f"Revert changes to: {out_of_scope_files}")
constraints.append("Only modify files in scope_baseline")
IF "DIMINISHING_RETURNS" detected:
constraints.append("Accept current state as complete")
constraints.append("Move to next item in queue")
IF "THRASHING" detected:
constraints.append("Stop current approach")
constraints.append("Request alternative strategy")
IF "GOLD_PLATING" detected:
constraints.append("Checklist complete; no further refinement")
\`\`\`
### ACT
Execute recommendation:
\`\`\`
1. EMIT signals based on severity:
IF severity == "none":
EMIT spiral_check_passed {iteration: current_iteration.iteration}
ELIF severity == "warning":
EMIT spiral_warning {
patterns: spirals_detected,
recommendation: recommendation,
commitment_level: current_commitment + commitment_delta
}
ELIF severity == "critical":
EMIT spiral_critical {
patterns: spirals_detected,
recommendation: recommendation,
commitment_level: current_commitment + commitment_delta
}
2. UPDATE state:
For each pattern in spirals_detected:
pattern_counts[pattern] += 1
3. RETURN outputs:
RETURN {
spirals_detected,
severity,
recommendation,
commitment_delta
}
\`\`\`
## Termination Conditions
- **Success**: Check complete, outputs returned
- **Failure**: N/A (always produces a result)
- **Timeout**: N/A (fast enough to not need timeout)
## Composition
### Can contain (nested loops)
- None (monitoring loop)
### Can be contained by
- \`orchestration/queue-processor\` (per-iteration monitoring)
- \`authoring/code-generation\` (inline spiral check)
### Parallelizable
- No (sequential by nature, monitors iteration sequence)
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`spiral_check_passed\` | No patterns detected | \`{ iteration }\` |
| \`spiral_warning\` | Warning-level patterns | \`{ patterns, recommendation, commitment_level }\` |
| \`spiral_critical\` | Critical patterns | \`{ patterns, recommendation, commitment_level }\` |
## Commitment Level Effects
The spiral detector's \`commitment_delta\` affects the containing orchestrator:
| Level | Effect on Work |
|-------|----------------|
| 0-1 | Full flexibility |
| 2 | Hard budget constraint active |
| 3 | Only incomplete items, no new scope |
| 4 | Bug fixes only |
| 5 | **FORCE COMPLETE** - accept current state |
## Example Usage
\`\`\`markdown
## In Queue Processor
For each iteration:
... do work ...
Execute @loops/orchestration/spiral-detector.md with:
INPUT:
iteration_history: previous_iterations
current_iteration: this_iteration
ON spiral_warning:
APPLY constraints to next iteration
INCREMENT commitment_level
ON spiral_critical:
IF commitment_level >= 4:
FORCE_COMPLETE current item
ELSE:
ESCALATE to user with options:
[C]ontinue with constraints
[A]ccept current state
[S]kip item
[H]alt orchestration
\`\`\`
## Tuning Guidelines
| Situation | Adjust |
|-----------|--------|
| Too many false positives | Increase thresholds |
| Missing real spirals | Decrease thresholds |
| Complex refactors flagging | Increase oscillation_file_count |
| Slow work flagging thrashing | Increase thrashing_time_multiplier |
`,
metadata: {
"type": "orchestration",
"speed": "fast",
"scope": "session",
"description": "Detect and prevent implementation spirals through pattern recognition.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
}
},
'refinement': {
'code-quality': {
content: `# Code Quality Loop
Improve code quality through systematic analysis and refinement.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: refinement
- **Speed**: fast (~10-30s per file)
- **Scope**: file or function
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| code | CodeUnit | yes | Code to refine (file, function, or snippet) |
| quality_dimensions | List<Dimension> | no | Which aspects to focus on |
| conventions | List<Convention> | no | Coding conventions to enforce |
| threshold | Score | no | Minimum quality score (default: 0.85) |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| refined_code | CodeUnit | Improved code |
| quality_score | Score | Final quality score |
| improvements | List<Improvement> | Changes made with rationale |
| remaining_issues | List<Issue> | Issues not auto-fixable |
| metrics | QualityMetrics | Detailed quality metrics |
### State
| Field | Type | Description |
|-------|------|-------------|
| iteration | Number | Refinement iteration |
| score_history | List<Score> | Score at each iteration |
| changes_made | List<Change> | All changes across iterations |
## Types
\`\`\`typescript
type Dimension =
| "readability" // Clear, understandable code
| "maintainability" // Easy to modify and extend
| "performance" // Efficient execution
| "security" // Safe from vulnerabilities
| "testability" // Easy to test
| "idiom" // Language-idiomatic patterns
type QualityMetrics = {
complexity: {
cyclomatic: number
cognitive: number
}
size: {
lines: number
functions: number
avg_function_length: number
}
documentation: {
coverage: number
quality: number
}
naming: {
consistency: number
descriptiveness: number
}
}
\`\`\`
## OODA Phases
### OBSERVE
Analyze code quality across dimensions:
\`\`\`
1. PARSE code structure:
ast = parse_to_ast(code)
functions = extract_functions(ast)
classes = extract_classes(ast)
variables = extract_variables(ast)
imports = extract_imports(ast)
comments = extract_comments(ast)
2. MEASURE complexity:
FOR each function in functions:
function.cyclomatic = calculate_cyclomatic(function)
function.cognitive = calculate_cognitive(function)
function.nesting_depth = calculate_max_nesting(function)
overall_complexity = {
cyclomatic: avg(f.cyclomatic for f in functions),
cognitive: avg(f.cognitive for f in functions),
max_nesting: max(f.nesting_depth for f in functions)
}
3. ASSESS readability:
readability_signals = {
avg_line_length: avg(len(line) for line in code.lines),
max_line_length: max(len(line) for line in code.lines),
avg_function_length: avg(len(f.lines) for f in functions),
naming_quality: assess_naming(variables, functions, classes),
comment_ratio: len(comments) / len(code.lines)
}
4. CHECK security:
security_patterns = [
{ pattern: "eval(", severity: "high", issue: "Code injection risk" },
{ pattern: "innerHTML", severity: "medium", issue: "XSS risk" },
{ pattern: "SQL.*\\\\+", severity: "high", issue: "SQL injection" },
{ pattern: "password.*=.*['\\"]", severity: "high", issue: "Hardcoded credential" },
{ pattern: "// TODO.*security", severity: "medium", issue: "Security TODO" }
]
FOR pattern in security_patterns:
matches = grep(code, pattern.pattern)
IF matches:
security_issues.append({pattern, matches})
5. VERIFY conventions:
IF conventions:
FOR convention in conventions:
violations = check_convention(code, convention)
IF violations:
convention_issues.extend(violations)
6. CALCULATE dimension scores:
scores = {
readability: score_readability(readability_signals),
maintainability: score_maintainability(complexity, size),
performance: score_performance(code, ast),
security: 1.0 - (len(security_issues) * 0.2),
testability: score_testability(functions, dependencies),
idiom: score_idiom(code, language_patterns)
}
# Weight by requested dimensions
IF quality_dimensions:
relevant_scores = {k: v for k, v in scores if k in quality_dimensions}
ELSE:
relevant_scores = scores
composite_score = weighted_avg(relevant_scores)
SIGNALS:
scores: per-dimension quality scores
issues: all detected issues
composite_score: overall quality
\`\`\`
### ORIENT
Identify improvements by priority:
\`\`\`
1. CATEGORIZE issues:
auto_fixable = []
manual_review = []
FOR issue in all_issues:
IF can_auto_fix(issue):
auto_fixable.append({
issue: issue,
fix: generate_fix(issue),
confidence: fix_confidence(issue),
impact: estimate_impact(issue)
})
ELSE:
manual_review.append({
issue: issue,
suggestion: generate_suggestion(issue),
severity: issue.severity
})
2. PRIORITIZE fixes:
# Sort by impact and confidence
auto_fixable.sort(by=[
impact descending,
confidence descending,
severity descending
])
# Group by type for batch application
fix_groups = group_by_type(auto_fixable)
3. ASSESS risk:
FOR fix in auto_fixable:
fix.risk = assess_risk(
code: code,
change: fix.fix,
has_tests: code.has_tests
)
IF fix.risk > risk_threshold:
# Move to manual review
manual_review.append(fix)
auto_fixable.remove(fix)
4. ESTIMATE improvement:
IF all auto_fixable applied:
estimated_new_score = simulate_score_after_fixes(
current_score: composite_score,
fixes: auto_fixable
)
will_pass = estimated_new_score >= threshold
\`\`\`
### DECIDE
Commit to refinement strategy:
\`\`\`
1. EVALUATE options:
IF composite_score >= threshold:
decision = "ACCEPT"
rationale = "Code meets quality threshold"
ELIF len(auto_fixable) > 0 AND will_pass:
decision = "REFINE"
rationale = f"Applying {len(auto_fixable)} auto-fixes"
ELIF iteration >= MAX_ITERATIONS:
decision = "ACCEPT_WITH_ISSUES"
rationale = f"Max iterations; {len(manual_review)} issues remain"
ELSE:
decision = "ESCALATE"
rationale = "Cannot auto-fix to meet threshold"
2. SELECT fixes to apply:
IF decision == "REFINE":
# Apply fixes in order of impact
fixes_this_iteration = auto_fixable[:max_fixes_per_iteration]
# Ensure no conflicting fixes
fixes_this_iteration = resolve_conflicts(fixes_this_iteration)
\`\`\`
### ACT
Apply refinements:
\`\`\`
1. IF decision == "ACCEPT":
EMIT quality_accepted {
score: composite_score,
metrics: metrics
}
RETURN {
refined_code: code,
quality_score: composite_score,
improvements: [],
remaining_issues: [],
metrics: metrics
}
2. IF decision == "REFINE":
refined_code = code
FOR fix in fixes_this_iteration:
# Apply the fix
refined_code = apply_fix(refined_code, fix)
# Record the change
improvements.append({
type: fix.type,
location: fix.location,
before: fix.original,
after: fix.fixed,
rationale: fix.rationale,
dimension: fix.dimension
})
changes_made.append(fix)
# Re-score
new_score = calculate_quality_score(refined_code)
score_history.append(new_score)
IF new_score >= threshold:
EMIT quality_improved {
score_before: composite_score,
score_after: new_score,
fixes_applied: len(fixes_this_iteration)
}
RETURN {
refined_code: refined_code,
quality_score: new_score,
improvements: improvements,
remaining_issues: manual_review,
metrics: recalculate_metrics(refined_code)
}
ELSE:
iteration += 1
RETURN to OBSERVE with refined_code
3. IF decision == "ACCEPT_WITH_ISSUES":
EMIT quality_partial {
score: composite_score,
remaining_issues: len(manual_review)
}
RETURN {
refined_code: code,
quality_score: composite_score,
improvements: improvements,
remaining_issues: manual_review,
metrics: metrics
}
4. IF decision == "ESCALATE":
EMIT quality_escalation {
score: composite_score,
blocking_issues: manual_review
}
RETURN {
refined_code: code,
quality_score: composite_score,
improvements: [],
remaining_issues: manual_review,
metrics: metrics
}
\`\`\`
## Termination Conditions
- **Success**: \`quality_score >= threshold\`
- **Failure**: Cannot improve score, issues require manual review
- **Timeout**: \`iteration >= MAX_ITERATIONS\` (default 3)
## Composition
### Can contain (nested loops)
- None (atomic refinement loop)
### Can be contained by
- \`authoring/code-generation\` (post-generation polish)
- \`verification/acceptance-gate\` (pre-acceptance polish)
- \`orchestration/queue-processor\` (batch refinement)
### Parallelizable
- Yes: Independent files can be refined in parallel
- No: Interdependent code units should be sequential
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`quality_accepted\` | Meets threshold initially | \`{ score, metrics }\` |
| \`quality_improved\` | Score improved to threshold | \`{ score_before, score_after, fixes }\` |
| \`quality_partial\` | Best effort, issues remain | \`{ score, remaining_issues }\` |
| \`quality_escalation\` | Cannot meet threshold | \`{ score, blocking_issues }\` |
## Auto-Fix Patterns
| Issue Type | Auto-Fix | Confidence |
|------------|----------|------------|
| Long lines | Break at logical points | High |
| Missing semicolons | Add semicolons | High |
| Inconsistent naming | Rename to convention | Medium |
| Dead code | Remove unused | Medium |
| Complex conditionals | Extract to function | Medium |
| Magic numbers | Extract to constant | Medium |
| Nested callbacks | Convert to async/await | Medium |
| Console.log | Remove or convert to logger | High |
## Example Usage
\`\`\`markdown
## Post-Generation Quality Check
FOR each generated_file:
Execute @loops/refinement/code-quality.md with:
INPUT:
code: generated_file.content
quality_dimensions: ["readability", "maintainability", "security"]
conventions: project_conventions
threshold: 0.85
ON quality_improved:
WRITE refined_code to file
LOG "Improved {file}: {score_before} → {score_after}"
ON quality_partial:
WRITE refined_code to file
FLAG remaining_issues for review
ON quality_escalation:
ADD to manual_review_queue
\`\`\`
## Quality Scoring
| Dimension | Weight | Thresholds |
|-----------|--------|------------|
| Readability | 0.25 | >0.8 good, >0.9 excellent |
| Maintainability | 0.25 | <10 cyclomatic good |
| Performance | 0.15 | No O(n²) in hot paths |
| Security | 0.20 | No high severity issues |
| Testability | 0.10 | <5 dependencies per function |
| Idiom | 0.05 | Follows language patterns |
`,
metadata: {
"type": "refinement",
"speed": "fast",
"scope": "file",
"description": "Improve code quality through systematic analysis and refinement.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'consistency-check': {
content: `# Consistency Check Loop
Validate cross-references and ensure consistency across documents/code.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: refinement
- **Speed**: fast (~5-20s per check)
- **Scope**: collection (multiple files/documents)
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| artifacts | List<Artifact> | yes | Documents/code to check for consistency |
| check_types | List<CheckType> | no | Types of consistency to verify |
| reference_sources | List<Source> | no | Authoritative sources for verification |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| consistent | Boolean | Whether all checks passed |
| inconsistencies | List<Inconsistency> | Found inconsistencies |
| resolutions | List<Resolution> | Suggested or applied fixes |
| cross_ref_map | CrossRefMap | Valid cross-references |
| terminology_report | TermReport | Term usage consistency |
### State
| Field | Type | Description |
|-------|------|-------------|
| terms_seen | Map<Term, List<Usage>> | Term occurrences |
| refs_validated | Set<RefId> | Validated references |
## Types
\`\`\`typescript
type CheckType =
| "cross_references" // Links between documents
| "terminology" // Consistent term usage
| "versioning" // Version number consistency
| "naming" // Identifier naming
| "schema" // Data structure consistency
| "api_contracts" // API consistency
| "dependencies" // Dependency versions
type Inconsistency = {
type: CheckType
severity: "error" | "warning" | "info"
locations: List<Location>
description: string
suggested_resolution: string
}
type Resolution = {
inconsistency_id: string
action: "auto_fixed" | "manual_required" | "acceptable"
changes: List<Change>
}
\`\`\`
## OODA Phases
### OBSERVE
Collect consistency signals across artifacts:
\`\`\`
1. INDEX all artifacts:
FOR artifact in artifacts:
IF artifact.type == "spec":
index_spec(artifact)
# Extract: requirements, terms, references
ELIF artifact.type == "code":
index_code(artifact)
# Extract: functions, types, imports
ELIF artifact.type == "doc":
index_doc(artifact)
# Extract: sections, links, terms
2. EXTRACT cross-references:
all_refs = []
FOR artifact in artifacts:
refs = extract_references(artifact)
# @spec-name.md, FR-001, #section, import X
FOR ref in refs:
all_refs.append({
from_artifact: artifact,
from_location: ref.location,
to_target: ref.target,
ref_type: ref.type
})
3. COLLECT terminology:
FOR artifact in artifacts:
terms = extract_terms(artifact)
# Domain terms, identifiers, acronyms
FOR term in terms:
IF term in terms_seen:
terms_seen[term].append({
artifact: artifact,
context: term.context,
definition: term.definition if defined
})
ELSE:
terms_seen[term] = [usage]
4. GATHER versioning info:
versions = {}
FOR artifact in artifacts:
version_refs = extract_versions(artifact)
# Package versions, spec versions, API versions
FOR ref in version_refs:
IF ref.package in versions:
versions[ref.package].append({
artifact: artifact,
version: ref.version
})
ELSE:
versions[ref.package] = [ref]
5. MAP schemas/contracts:
schemas = {}
FOR artifact in artifacts:
IF has_schema(artifact):
schema = extract_schema(artifact)
schemas[schema.name] = {
artifact: artifact,
fields: schema.fields,
types: schema.types
}
SIGNALS:
all_refs: cross-references to validate
terms_seen: term usage map
versions: version references
schemas: schema definitions
\`\`\`
### ORIENT
Analyze for inconsistencies:
\`\`\`
1. VALIDATE cross-references:
IF "cross_references" in check_types:
FOR ref in all_refs:
target = resolve_target(ref.to_target, artifacts)
IF target is None:
inconsistencies.append({
type: "cross_references",
severity: "error",
locations: [ref.from_location],
description: f"Broken reference to {ref.to_target}",
suggested_resolution: find_similar_target(ref.to_target)
})
ELSE:
cross_ref_map.add_valid(ref, target)
refs_validated.add(ref.id)
2. CHECK terminology consistency:
IF "terminology" in check_types:
FOR term, usages in terms_seen.items():
# Check for inconsistent definitions
definitions = unique(u.definition for u in usages if u.definition)
IF len(definitions) > 1:
inconsistencies.append({
type: "terminology",
severity: "warning",
locations: [u.artifact for u in usages],
description: f"Term '{term}' has multiple definitions",
suggested_resolution: "Consolidate to single definition"
})
# Check for inconsistent casing/spelling
spellings = unique(u.surface_form for u in usages)
IF len(spellings) > 1:
canonical = most_common(spellings)
inconsistencies.append({
type: "terminology",
severity: "info",
locations: varied_locations,
description: f"Inconsistent spelling: {spellings}",
suggested_resolution: f"Standardize to '{canonical}'"
})
3. VERIFY version consistency:
IF "versioning" in check_types:
FOR package, refs in versions.items():
version_set = unique(r.version for r in refs)
IF len(version_set) > 1:
inconsistencies.append({
type: "versioning",
severity: "error",
locations: [r.artifact for r in refs],
description: f"{package} has multiple versions: {version_set}",
suggested_resolution: f"Align to {max(version_set)}"
})
4. CHECK schema consistency:
IF "schema" in check_types:
# Compare schema definitions across artifacts
FOR schema_name, definitions in group_schemas_by_name(schemas):
IF len(definitions) > 1:
diffs = compare_schemas(definitions)
IF diffs:
inconsistencies.append({
type: "schema",
severity: "error",
locations: [d.artifact for d in definitions],
description: f"Schema '{schema_name}' has conflicting definitions",
suggested_resolution: "Reconcile field differences"
})
5. VERIFY API contracts:
IF "api_contracts" in check_types:
# Compare spec API with implementation
FOR endpoint in spec_endpoints:
impl = find_implementation(endpoint)
IF impl:
diff = compare_contract(endpoint, impl)
IF diff:
inconsistencies.append({
type: "api_contracts",
severity: "error",
locations: [endpoint.location, impl.location],
description: f"API mismatch: {diff}",
suggested_resolution: "Update spec or implementation"
})
6. CALCULATE consistency score:
total_checks = len(all_refs) + len(terms_seen) + len(versions) + len(schemas)
issues = len(inconsistencies)
consistency_score = (total_checks - issues) / total_checks
\`\`\`
### DECIDE
Determine resolution strategy:
\`\`\`
1. CATEGORIZE inconsistencies:
auto_resolvable = []
manual_required = []
FOR issue in inconsistencies:
IF can_auto_resolve(issue):
auto_resolvable.append({
issue: issue,
resolution: generate_resolution(issue),
confidence: resolution_confidence(issue)
})
ELSE:
manual_required.append(issue)
2. PRIORITIZE by severity:
errors = filter(inconsistencies, severity == "error")
warnings = filter(inconsistencies, severity == "warning")
info = filter(inconsistencies, severity == "info")
IF len(errors) > 0:
priority = "must_resolve"
ELIF len(warnings) > 0:
priority = "should_resolve"
ELSE:
priority = "optional"
3. DECIDE on action:
IF len(inconsistencies) == 0:
decision = "CONSISTENT"
ELIF len(errors) == 0 AND len(auto_resolvable) == len(inconsistencies):
decision = "AUTO_FIX"
ELIF len(errors) > 0:
decision = "BLOCK"
ELSE:
decision = "PARTIAL_FIX"
\`\`\`
### ACT
Apply resolutions:
\`\`\`
1. IF decision == "CONSISTENT":
EMIT consistency_verified {
artifacts: len(artifacts),
refs_validated: len(refs_validated),
terms_checked: len(terms_seen)
}
RETURN {
consistent: True,
inconsistencies: [],
resolutions: [],
cross_ref_map: cross_ref_map,
terminology_report: generate_term_report(terms_seen)
}
2. IF decision == "AUTO_FIX":
FOR resolution in auto_resolvable:
IF resolution.confidence > confidence_threshold:
apply_resolution(resolution)
resolutions.append({
inconsistency_id: resolution.issue.id,
action: "auto_fixed",
changes: resolution.changes
})
EMIT consistency_fixed {
fixes_applied: len(resolutions),
remaining: 0
}
RETURN {
consistent: True,
inconsistencies: [],
resolutions: resolutions,
cross_ref_map: cross_ref_map,
terminology_report: generate_term_report(terms_seen)
}
3. IF decision == "PARTIAL_FIX":
FOR resolution in auto_resolvable:
IF resolution.confidence > confidence_threshold:
apply_resolution(resolution)
resolutions.append({...})
FOR issue in manual_required:
resolutions.append({
inconsistency_id: issue.id,
action: "manual_required",
suggestion: issue.suggested_resolution
})
EMIT consistency_partial {
fixed: len(auto_fixed),
manual_needed: len(manual_required)
}
RETURN {
consistent: False,
inconsistencies: manual_required,
resolutions: resolutions,
cross_ref_map: cross_ref_map,
terminology_report: generate_term_report(terms_seen)
}
4. IF decision == "BLOCK":
EMIT consistency_blocked {
error_count: len(errors),
blocking_issues: errors
}
RETURN {
consistent: False,
inconsistencies: inconsistencies,
resolutions: [],
cross_ref_map: partial_map,
terminology_report: generate_term_report(terms_seen)
}
\`\`\`
## Termination Conditions
- **Success**: All checks pass or only auto-fixable issues
- **Failure**: Blocking inconsistencies that require manual resolution
- **Timeout**: N/A (fast loop, no timeout)
## Composition
### Can contain (nested loops)
- None (atomic check loop)
### Can be contained by
- \`verification/acceptance-gate\` (as validation check)
- \`authoring/spec-drafting\` (post-draft validation)
- Workflow commands
### Parallelizable
- Conditional: Different check types can run in parallel
- No: Cross-artifact checks need full artifact set
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`consistency_verified\` | All checks pass | \`{ artifacts, refs, terms }\` |
| \`consistency_fixed\` | All issues auto-fixed | \`{ fixes_applied }\` |
| \`consistency_partial\` | Some manual fixes needed | \`{ fixed, manual_needed }\` |
| \`consistency_blocked\` | Blocking issues found | \`{ error_count, issues }\` |
## Check Patterns
### Cross-Reference Validation
\`\`\`
Pattern: @[spec-name]#[section]
Example: @auth-spec.md#token-handling
Validation:
1. File exists
2. Section/anchor exists
3. Content still relevant
\`\`\`
### Terminology Consistency
\`\`\`
Check: Same term, same definition
Example: "Access Token" defined consistently
Detection:
1. Extract all term definitions
2. Group by normalized term
3. Compare definitions
4. Flag differences
\`\`\`
### Version Alignment
\`\`\`
Check: Same package, same version
Example: "effect": "^3.0.0" in all package.json
Detection:
1. Extract version declarations
2. Group by package name
3. Compare version strings
4. Flag mismatches
\`\`\`
## Example Usage
\`\`\`markdown
## Integration Validation
Execute @loops/refinement/consistency-check.md with:
INPUT:
artifacts: [spec_a, spec_b, spec_c, implementation]
check_types: ["cross_references", "terminology", "api_contracts"]
ON consistency_verified:
PROCEED to next phase
ON consistency_partial:
APPLY auto-fixes
FLAG manual issues for review
ON consistency_blocked:
HALT workflow
REPORT blocking issues
\`\`\`
`,
metadata: {
"type": "refinement",
"speed": "fast",
"scope": "collection",
"description": "Validate cross-references and ensure consistency across documents and code.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'requirement-quality': {
content: `# Requirement Quality Loop
Refine individual requirements for clarity, testability, and implementability.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: refinement
- **Speed**: fast (~5-15s per requirement)
- **Scope**: item
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| requirement | Requirement | yes | The requirement to refine |
| context | SpecContext | no | Surrounding spec context |
| related_requirements | List<Requirement> | no | Other requirements to check consistency |
| threshold | Score | no | Minimum quality score (default: 0.85) |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| refined_requirement | Requirement | Improved requirement |
| quality_score | Score | Final quality score |
| transformations | List<Transformation> | Changes applied |
| conflicts | List<Conflict> | Detected conflicts with related requirements |
| acceptance_criteria | List<Text> | Generated acceptance criteria |
### State
| Field | Type | Description |
|-------|------|-------------|
| iteration | Number | Refinement iteration |
| score_history | List<Score> | Score at each iteration |
| transformation_log | List<Transformation> | All transformations applied |
## OODA Phases
### OBSERVE
Assess requirement quality:
\`\`\`
1. APPLY SMART criteria scoring:
SPECIFIC (0-1):
1.0: Precise values, no ambiguity
0.7: Mostly precise, minor ambiguity
0.4: General direction, significant ambiguity
0.0: Completely vague
MEASURABLE (0-1):
1.0: Clear pass/fail criteria exist
0.7: Criteria exist but need interpretation
0.4: Partial criteria, some aspects unmeasurable
0.0: No way to verify completion
ACHIEVABLE (0-1):
1.0: Clearly feasible with known techniques
0.7: Feasible with some research/exploration
0.4: Uncertain feasibility
0.0: Appears impossible or contradictory
RELEVANT (0-1):
1.0: Directly serves spec objectives
0.7: Indirectly serves objectives
0.4: Tangentially related
0.0: Unrelated to spec purpose
TIME-BOUND (0-1):
1.0: Clear phases/milestones
0.7: Implicit timeline
0.4: No timeline but not blocking
0.0: Timeline needed but missing
2. DETECT anti-patterns:
VAGUE:
indicators = [
"should handle ... appropriately",
"should be user-friendly",
"should be performant",
"should be secure",
"etc.", "and so on"
]
detected = any(indicator in requirement.text)
COMPOUND:
indicators = [
" and ",
" also ",
", as well as",
multiple_verbs > 1
]
detected = contains_multiple_distinct_requirements
ASSUMPTIVE:
indicators = [
references undefined terms,
assumes prior implementation,
implicit dependencies
]
detected = has_unstated_dependencies
UNTESTABLE:
indicators = [
no quantifiable criteria,
subjective terms,
no observable behavior
]
detected = cannot_write_acceptance_test
3. CHECK consistency:
For each related_requirement:
→ Do they contradict?
→ Do they overlap?
→ Do they have implicit dependencies?
conflicts = detected contradictions/overlaps
OBSERVATION OUTPUTS:
smart_scores = {S, M, A, R, T}
composite_score = avg(smart_scores)
anti_patterns = [detected patterns]
conflicts = [detected conflicts]
\`\`\`
### ORIENT
Identify improvement strategy:
\`\`\`
1. CALCULATE priority areas:
weak_dimensions = filter(smart_scores, score < 0.7)
priority_order = [
"SPECIFIC" if detected "VAGUE",
"MEASURABLE" if detected "UNTESTABLE",
dimensions in weak_dimensions by score ascending
]
2. SELECT transformation type:
IF "VAGUE" in anti_patterns:
transformation = "CLARIFY"
strategy = add_specific_values_and_examples
ELIF "COMPOUND" in anti_patterns:
transformation = "SPLIT"
strategy = decompose_into_atomic_requirements
ELIF "UNTESTABLE" in anti_patterns:
transformation = "REFRAME"
strategy = convert_to_given_when_then
ELIF "ASSUMPTIVE" in anti_patterns:
transformation = "EXPLICIT"
strategy = surface_hidden_dependencies
ELIF conflicts.length > 0:
transformation = "RECONCILE"
strategy = resolve_conflicts
ELIF composite_score < threshold:
transformation = "STRENGTHEN"
strategy = improve_weakest_dimension
3. ESTIMATE improvement potential:
expected_improvement = estimate based on transformation type
will_pass = (composite_score + expected_improvement) >= threshold
\`\`\`
### DECIDE
Commit to transformation:
\`\`\`
IF composite_score >= threshold:
decision = "ACCEPT"
rationale = "Requirement meets quality threshold"
ELIF iteration >= MAX_ITERATIONS:
decision = "ACCEPT_WITH_CAVEATS"
rationale = "Max iterations reached, best effort result"
caveats = [weak_dimensions, remaining_anti_patterns]
ELIF transformation == "SPLIT" AND would_exceed_requirement_limit:
decision = "FLAG_FOR_REVIEW"
rationale = "Splitting would create too many requirements"
ELSE:
decision = "TRANSFORM"
rationale = f"Applying {transformation} to address {issues}"
\`\`\`
### ACT
Apply the transformation:
\`\`\`
IF decision == "ACCEPT":
refined_requirement = requirement
EMIT requirement_accepted
ELIF decision == "ACCEPT_WITH_CAVEATS":
refined_requirement = requirement
refined_requirement.caveats = caveats
EMIT requirement_accepted_with_caveats
ELIF decision == "FLAG_FOR_REVIEW":
refined_requirement = requirement
refined_requirement.needs_review = True
EMIT requirement_flagged
ELIF decision == "TRANSFORM":
IF transformation == "CLARIFY":
"""
BEFORE (vague):
"The system should handle errors gracefully"
AFTER (specific):
"When API calls fail, the system shall:
a) Log error with correlation ID to stderr
b) Return HTTP 500 with JSON body: {error_code, message, correlation_id}
c) Increment error_count metric with labels {endpoint, error_type}
d) Trigger PagerDuty alert if error rate exceeds 1% over 5 minutes"
"""
refined_text = add_specific_values(requirement.text)
refined_text = add_examples(refined_text)
refined_text = remove_vague_qualifiers(refined_text)
ELIF transformation == "SPLIT":
"""
BEFORE (compound):
"Users can create, edit, and delete documents with version history"
AFTER (atomic):
FR-001: "Users can create new documents with title (required) and content (optional)"
FR-002: "Users can edit the title and content of documents they own"
FR-003: "Users can delete documents they own (soft delete, recoverable for 30 days)"
FR-004: "The system maintains version history for all document changes"
FR-005: "Users can view version history and restore previous versions"
"""
atomic_requirements = decompose(requirement)
refined_requirement = atomic_requirements # Returns list
ELIF transformation == "REFRAME":
"""
BEFORE (untestable):
"The UI should be responsive"
AFTER (testable - Given/When/Then):
"GIVEN a user on any viewport from 320px to 2560px width
WHEN they interact with any UI element
THEN the element shall respond within 100ms
AND the layout shall not break or overflow"
"""
refined_text = convert_to_given_when_then(requirement.text)
acceptance_criteria = extract_test_cases(refined_text)
ELIF transformation == "EXPLICIT":
"""
BEFORE (assumptive):
"Notifications should be sent when status changes"
AFTER (explicit):
"When document status changes from any state to 'published':
- Dependency: Notification service must be available (see NFR-012)
- Dependency: User has notification preferences set (see FR-023)
The system shall:
a) Queue notification within 1 second of status change
b) Deliver via channels matching user preferences
c) Include: document title, old status, new status, changed_by"
"""
refined_text = surface_dependencies(requirement.text)
refined_text = add_dependency_references(refined_text)
ELIF transformation == "RECONCILE":
"""
Resolve conflict between requirements
"""
refined_text = merge_or_prioritize(requirement, conflicts)
resolution_notes = document_resolution(conflicts)
# Record transformation
transformation_log.append({
original: requirement.text,
refined: refined_text,
transformation: transformation,
rationale: rationale
})
# Update requirement
refined_requirement = Requirement(
id: requirement.id,
text: refined_text,
priority: requirement.priority,
acceptance_criteria: acceptance_criteria or generate_criteria(refined_text)
)
# Re-score
new_score = assess_quality(refined_requirement)
score_history.append(new_score)
IF new_score >= threshold:
EMIT requirement_refined {requirement_id, score_before, score_after, transformation}
ELSE:
iteration += 1
RETURN to OBSERVE # Continue refining
\`\`\`
## Termination Conditions
- **Success**: \`quality_score >= threshold\`
- **Failure**: Transformation would violate constraints (too many requirements, etc.)
- **Timeout**: \`iteration >= MAX_ITERATIONS\` (default 3)
## Composition
### Can contain (nested loops)
- None (atomic loop)
### Can be contained by
- \`authoring/spec-drafting\` (inline refinement)
- \`orchestration/queue-processor\` (batch refinement)
- \`verification/acceptance-gate\` (pre-acceptance polish)
### Parallelizable
- Yes: Independent requirements can be refined in parallel
- Conditional: Requirements with potential conflicts should be sequential
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`requirement_accepted\` | Meets threshold | \`{ id, score }\` |
| \`requirement_refined\` | Successfully transformed | \`{ id, score_before, score_after, transformation }\` |
| \`requirement_split\` | Decomposed into multiple | \`{ original_id, new_ids: List }\` |
| \`requirement_flagged\` | Needs manual review | \`{ id, reason, caveats }\` |
| \`conflict_detected\` | Inconsistency found | \`{ requirement_ids, conflict_type }\` |
## Transformation Examples
### VAGUE → SPECIFIC
| Before | After |
|--------|-------|
| "fast response times" | "95th percentile response time ≤ 200ms" |
| "handle high load" | "sustain 10,000 concurrent connections" |
| "secure authentication" | "OAuth 2.0 with PKCE, tokens expire in 1 hour" |
| "user-friendly error messages" | "error messages include: error code, human-readable description, suggested action, support link" |
### COMPOUND → ATOMIC
| Before | After |
|--------|-------|
| "Users can search, filter, and sort results" | FR-1: "Users can search by keyword" <br> FR-2: "Users can filter by category, date, status" <br> FR-3: "Users can sort by relevance, date, title" |
### UNTESTABLE → TESTABLE
| Before | After |
|--------|-------|
| "The system should be reliable" | "GIVEN normal operation <br> WHEN the system runs for 24 hours <br> THEN uptime shall be ≥ 99.9% <br> AND no data loss shall occur" |
## Example Usage
\`\`\`markdown
## Phase 3: Refinement
For each requirement in spec.requirements:
Execute @loops/refinement/requirement-quality.md with:
INPUT:
requirement: requirement
context: spec_context
related_requirements: spec.requirements
threshold: 0.85
CONFIG:
MAX_ITERATIONS: 3
ON requirement_refined:
UPDATE requirement in spec
ON requirement_split:
REPLACE requirement with new atomic requirements
RE-INDEX requirement IDs
ON requirement_flagged:
ADD to manual_review_queue
ON conflict_detected:
ESCALATE to user for resolution
\`\`\`
## Quality Checklist
Before accepting a requirement:
- [ ] No vague qualifiers remain (appropriately, properly, etc.)
- [ ] All values are specific and measurable
- [ ] Acceptance criteria are testable (can write automated test)
- [ ] Dependencies are explicit and referenced
- [ ] Scope is clear (what's NOT included)
- [ ] No conflicts with other requirements
`,
metadata: {
"type": "refinement",
"speed": "fast",
"scope": "item",
"description": "Refine individual requirements for clarity, testability, and implementability.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
}
},
'verification': {
'acceptance-gate': {
content: `# Acceptance Gate Loop
Validate work against acceptance criteria before proceeding.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: verification
- **Speed**: medium (~30s-2min per gate)
- **Scope**: document or milestone
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| artifact | Artifact | yes | The artifact to validate (spec, code, etc.) |
| acceptance_criteria | List<Criterion> | yes | Criteria that must pass |
| validation_type | ValidationType | no | How to validate (default: "automated") |
| threshold | Score | no | Minimum pass score (default: 1.0 for all must pass) |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| passed | Boolean | Whether gate passed |
| results | List<CriterionResult> | Result for each criterion |
| score | Score | Overall pass percentage |
| blockers | List<Criterion> | Failed criteria blocking progress |
| warnings | List<Criterion> | Failed non-blocking criteria |
| remediation | List<Action> | Suggested fixes for failures |
## OODA Phases
### OBSERVE
Gather validation evidence for each criterion:
\`\`\`
FOR each criterion in acceptance_criteria:
IF criterion.validation.type == "automated":
# Run automated validation (tests, lint, type check)
result = run_validation(artifact, criterion.test_spec)
evidence = { type: "automated", output, exit_code, duration }
ELIF criterion.validation.type == "manual":
# Agent evaluates checklist items
FOR each item in criterion.checklist:
item_result = evaluate_checklist_item(artifact, item)
evidence = { type: "manual", checklist_results }
observations.append({ criterion, evidence })
\`\`\`
### ORIENT
Assess validation results:
\`\`\`
results = []
blockers = []
warnings = []
FOR each observation:
# Determine pass/fail
passed = evaluate_evidence(observation.evidence)
confidence = calculate_confidence(observation.evidence)
result = CriterionResult(criterion_id, passed, evidence, confidence)
results.append(result)
# Categorize failures
IF NOT passed:
IF criterion.priority == "must":
blockers.append(criterion)
ELSE:
warnings.append(criterion)
# Calculate overall score
score = passed_count / total_count
# Determine if gate passes
all_musts_passed = all must-priority criteria passed
gate_passed = all_musts_passed AND score >= threshold
\`\`\`
### DECIDE
Determine gate outcome:
\`\`\`
IF gate_passed:
decision = "PASS"
ELIF attempt >= MAX_ATTEMPTS:
decision = "FAIL_FINAL"
ELIF can_auto_remediate(blockers):
decision = "REMEDIATE"
remediation = generate_remediation_actions(blockers)
ELIF len(blockers) > 0:
decision = "FAIL_BLOCKING"
remediation = generate_remediation_suggestions(blockers)
ELSE: # Only warnings
decision = "PASS_WITH_WARNINGS"
\`\`\`
### ACT
Execute decision:
\`\`\`
IF decision == "PASS":
EMIT gate_passed { artifact, score, results }
RETURN { passed: True, ... }
ELIF decision == "PASS_WITH_WARNINGS":
EMIT gate_passed_with_warnings { artifact, score, warnings }
RETURN { passed: True, warnings, ... }
ELIF decision == "REMEDIATE":
FOR each action in remediation:
execute_remediation(action)
attempt += 1
RETURN to OBSERVE
ELIF decision == "FAIL_BLOCKING":
EMIT gate_failed { artifact, blockers, remediation }
RETURN { passed: False, blockers, ... }
ELIF decision == "FAIL_FINAL":
EMIT gate_failed_final { artifact, attempts, blockers }
RETURN { passed: False, blockers, remediation: [] }
\`\`\`
## Termination Conditions
- **Success**: \`decision == "PASS"\` or \`decision == "PASS_WITH_WARNINGS"\`
- **Failure**: \`decision == "FAIL_BLOCKING"\` or \`decision == "FAIL_FINAL"\`
- **Timeout**: \`attempt >= MAX_ATTEMPTS\` (default 3)
## Composition
### Can contain (nested loops)
- \`refinement/requirement-quality\` (inline fixes)
- \`refinement/code-quality\` (inline fixes)
### Can be contained by
- \`orchestration/queue-processor\` (milestone gates)
- \`authoring/spec-drafting\` (post-draft validation)
- \`authoring/code-generation\` (post-implementation validation)
### Parallelizable
- Conditional: Independent criteria can validate in parallel
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`gate_passed\` | All must criteria pass | \`{ artifact, score, results }\` |
| \`gate_passed_with_warnings\` | Musts pass, shoulds fail | \`{ artifact, score, warnings }\` |
| \`gate_failed\` | Must criteria failed | \`{ artifact, blockers, remediation }\` |
| \`gate_failed_final\` | Max attempts exhausted | \`{ artifact, attempts, blockers }\` |
## Validation Patterns
### Spec Validation Criteria
\`\`\`yaml
acceptance_criteria:
- id: SPEC-001
description: "All required sections present"
priority: must
validation:
type: automated
test: { check: sections_present, required: [Summary, Requirements] }
- id: SPEC-002
description: "All requirements have acceptance criteria"
priority: must
validation:
type: automated
test: { check: requirements_have_criteria }
- id: SPEC-003
description: "No TBD markers in must-have sections"
priority: must
validation:
type: automated
test: { check: no_tbd_in_sections, sections: [Requirements, Design] }
\`\`\`
### Code Validation Criteria
\`\`\`yaml
acceptance_criteria:
- id: CODE-001
description: "All tests pass"
priority: must
validation:
type: automated
test: { command: "npm test", expect_exit: 0 }
- id: CODE-002
description: "Type check passes"
priority: must
validation:
type: automated
test: { command: "npm run typecheck", expect_exit: 0 }
\`\`\`
## Example Usage
\`\`\`markdown
Execute @loops/verification/acceptance-gate.md with:
INPUT:
artifact: { type: "spec", document: drafted_spec }
acceptance_criteria: spec_validation_criteria
threshold: 1.0
ON gate_passed:
PROCEED to next phase
ON gate_failed:
IF has remediation:
EXECUTE remediation
RE-RUN gate
ELSE:
ESCALATE to user
\`\`\`
`,
metadata: {
"type": "verification",
"speed": "medium",
"scope": "document",
"description": "Validate work against acceptance criteria before proceeding.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'fact-checking': {
content: `# Fact Checking Loop
Verify claims against sources of truth with confidence scoring.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: verification
- **Speed**: medium (~15-60s per claim)
- **Scope**: item (single claim)
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| claim | Claim | yes | The claim to verify |
| source_types | List<SourceType> | no | Types of sources to check |
| confidence_threshold | Score | no | Min confidence to accept (default: 0.85) |
| auto_correct | Boolean | no | Whether to generate corrections (default: true) |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| verdict | Verdict | Verification result |
| confidence | Score | Confidence in verdict |
| evidence | List<Evidence> | Supporting/contradicting evidence |
| correction | Correction | Suggested fix if claim is false |
| sources | List<Source> | All sources consulted |
### State
| Field | Type | Description |
|-------|------|-------------|
| evidence_collected | List<Evidence> | Evidence gathered so far |
| sources_checked | Set<Source> | Sources already consulted |
## Types
\`\`\`typescript
type Claim = {
id: string
text: string
source_document: string
line_number: number
category: "codebase" | "external" | "internal"
}
type Verdict =
| "VERIFIED" // Claim is accurate
| "CONTRADICTED" // Claim conflicts with evidence
| "OUTDATED" // Claim was true but is no longer
| "UNVERIFIABLE" // Cannot find evidence either way
| "PARTIAL" // Partially true, needs nuance
type Evidence = {
type: "supporting" | "contradicting" | "contextual"
source: Source
content: string
relevance: Score
recency: Date
}
type SourceType =
| "codebase" // Search actual code
| "documentation" // Official docs
| "web" // Web search
| "api" // Live API calls
| "database" // Database queries
\`\`\`
## OODA Phases
### OBSERVE
Gather evidence for the claim:
\`\`\`
1. PARSE claim:
# Extract verifiable components
entities = extract_entities(claim.text)
# Functions, classes, endpoints, values
assertions = extract_assertions(claim.text)
# "X does Y", "X returns Z", "X is configured as W"
quantifiers = extract_quantifiers(claim.text)
# "always", "never", "up to N", "at least M"
2. SEARCH by source type:
IF "codebase" in source_types:
FOR entity in entities:
# Search for entity in code
results = SemanticSearch(
query: f"Where is {entity} defined or used?",
target: codebase
)
FOR result in results:
code_content = Read(result.file, result.lines)
evidence_collected.append({
type: assess_support(code_content, claim),
source: { type: "codebase", location: result },
content: code_content,
relevance: result.score,
recency: file_modified_date(result.file)
})
sources_checked.add(result.file)
IF "documentation" in source_types:
# Search official documentation
doc_query = formulate_doc_query(claim, entities)
results = search_docs(doc_query)
FOR result in results:
evidence_collected.append({
type: assess_support(result.content, claim),
source: { type: "documentation", url: result.url },
content: result.relevant_section,
relevance: result.score,
recency: result.last_updated
})
IF "web" in source_types:
# Web search for external claims
web_query = formulate_web_query(claim)
results = WebSearch(web_query)
FOR result in results[:5]:
content = Scrape(result.url)
relevant = extract_relevant_section(content, claim)
evidence_collected.append({
type: assess_support(relevant, claim),
source: { type: "web", url: result.url },
content: relevant,
relevance: calculate_relevance(relevant, claim),
recency: extract_date(content)
})
IF "api" in source_types:
# Make live API calls to verify
FOR assertion in assertions:
IF is_api_verifiable(assertion):
response = call_api(assertion.endpoint, assertion.params)
evidence_collected.append({
type: compare_response(response, assertion.expected),
source: { type: "api", endpoint: assertion.endpoint },
content: response,
relevance: 1.0, # Direct verification
recency: now()
})
3. ASSESS evidence quality:
FOR evidence in evidence_collected:
evidence.quality_score = calculate_quality(
source_authority: source_authority(evidence.source),
recency: days_since(evidence.recency),
relevance: evidence.relevance,
specificity: how_specific(evidence.content, claim)
)
SIGNALS:
evidence_collected: all evidence with assessments
sources_checked: sources consulted
\`\`\`
### ORIENT
Synthesize evidence into verdict:
\`\`\`
1. CATEGORIZE evidence:
supporting = filter(evidence_collected, type == "supporting")
contradicting = filter(evidence_collected, type == "contradicting")
contextual = filter(evidence_collected, type == "contextual")
2. WEIGHT evidence:
# Higher weight for authoritative, recent, relevant sources
supporting_weight = sum(
e.quality_score * e.relevance
for e in supporting
)
contradicting_weight = sum(
e.quality_score * e.relevance
for e in contradicting
)
total_weight = supporting_weight + contradicting_weight
3. DETERMINE initial verdict:
IF total_weight == 0:
initial_verdict = "UNVERIFIABLE"
confidence = 0.0
ELIF contradicting_weight > supporting_weight * 1.5:
initial_verdict = "CONTRADICTED"
confidence = contradicting_weight / total_weight
ELIF supporting_weight > contradicting_weight * 1.5:
initial_verdict = "VERIFIED"
confidence = supporting_weight / total_weight
ELSE:
initial_verdict = "PARTIAL"
confidence = abs(supporting_weight - contradicting_weight) / total_weight
4. CHECK for outdated:
IF initial_verdict == "CONTRADICTED":
# Check if claim was previously true
old_evidence = filter(evidence_collected,
recency < claim_date AND type == "supporting"
)
new_evidence = filter(evidence_collected,
recency > claim_date AND type == "contradicting"
)
IF len(old_evidence) > 0 AND len(new_evidence) > 0:
initial_verdict = "OUTDATED"
5. IDENTIFY key evidence:
# Select most important evidence for each category
key_supporting = top_k(supporting, by=quality_score, k=3)
key_contradicting = top_k(contradicting, by=quality_score, k=3)
\`\`\`
### DECIDE
Commit to verdict and correction strategy:
\`\`\`
1. FINALIZE verdict:
IF confidence >= confidence_threshold:
final_verdict = initial_verdict
ELIF initial_verdict in ["VERIFIED", "CONTRADICTED"]:
final_verdict = "PARTIAL"
# Lower confidence means less certainty
ELSE:
final_verdict = initial_verdict
2. DECIDE on correction:
IF auto_correct AND final_verdict in ["CONTRADICTED", "OUTDATED", "PARTIAL"]:
should_correct = True
IF final_verdict == "CONTRADICTED":
correction_type = "replace"
# Generate replacement from evidence
ELIF final_verdict == "OUTDATED":
correction_type = "update"
# Update with current information
ELIF final_verdict == "PARTIAL":
correction_type = "clarify"
# Add nuance or qualifications
ELSE:
should_correct = False
3. ASSESS correction confidence:
IF should_correct:
correction_confidence = min(
confidence,
avg(e.quality_score for e in key_contradicting)
)
\`\`\`
### ACT
Generate outputs:
\`\`\`
1. FORMAT evidence:
evidence_output = []
FOR e in key_supporting + key_contradicting:
evidence_output.append({
type: e.type,
source: format_source(e.source),
content: truncate(e.content, 500),
relevance: e.relevance,
recency: e.recency
})
2. GENERATE correction if needed:
IF should_correct:
IF correction_type == "replace":
# Generate replacement claim from contradicting evidence
correction = {
type: "replace",
original: claim.text,
corrected: generate_corrected_claim(
claim,
key_contradicting
),
confidence: correction_confidence,
sources: [e.source for e in key_contradicting]
}
ELIF correction_type == "update":
correction = {
type: "update",
original: claim.text,
corrected: update_claim_with_current(
claim,
key_contradicting
),
confidence: correction_confidence,
note: "Information has changed since original writing"
}
ELIF correction_type == "clarify":
correction = {
type: "clarify",
original: claim.text,
corrected: add_nuance(
claim,
key_supporting,
key_contradicting
),
confidence: correction_confidence,
note: "Added qualifications for accuracy"
}
ELSE:
correction = None
3. EMIT verdict:
IF final_verdict == "VERIFIED":
EMIT claim_verified {
claim_id: claim.id,
confidence: confidence,
key_evidence: key_supporting[0] if key_supporting else None
}
ELIF final_verdict == "CONTRADICTED":
EMIT claim_contradicted {
claim_id: claim.id,
confidence: confidence,
contradiction: key_contradicting[0].content
}
ELIF final_verdict == "OUTDATED":
EMIT claim_outdated {
claim_id: claim.id,
was_true_until: estimate_change_date(evidence_collected)
}
ELIF final_verdict == "UNVERIFIABLE":
EMIT claim_unverifiable {
claim_id: claim.id,
sources_checked: len(sources_checked)
}
4. RETURN:
RETURN {
verdict: final_verdict,
confidence: confidence,
evidence: evidence_output,
correction: correction,
sources: list(sources_checked)
}
\`\`\`
## Termination Conditions
- **Success**: Verdict determined with confidence >= threshold
- **Failure**: N/A (always produces a verdict)
- **Timeout**: Max search time reached → return with available evidence
## Composition
### Can contain (nested loops)
- None (atomic verification loop)
### Can be contained by
- \`orchestration/queue-processor\` (batch fact-checking)
- Workflow commands (\`/fact-check-standalone\`)
### Parallelizable
- Yes: Independent claims can be verified in parallel
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`claim_verified\` | Claim is accurate | \`{ claim_id, confidence, evidence }\` |
| \`claim_contradicted\` | Claim conflicts with evidence | \`{ claim_id, contradiction }\` |
| \`claim_outdated\` | Claim was true but changed | \`{ claim_id, was_true_until }\` |
| \`claim_unverifiable\` | Cannot determine | \`{ claim_id, sources_checked }\` |
| \`correction_generated\` | Fix suggested | \`{ claim_id, correction }\` |
## Evidence Quality Factors
| Factor | Weight | Description |
|--------|--------|-------------|
| Source Authority | 0.3 | Official docs > blogs > forums |
| Recency | 0.25 | Newer > older |
| Relevance | 0.25 | Direct match > tangential |
| Specificity | 0.2 | Exact quote > general mention |
## Example Usage
\`\`\`markdown
## Fact-Check Documentation
FOR each claim in extracted_claims:
Execute @loops/verification/fact-checking.md with:
INPUT:
claim: claim
source_types: ["codebase", "documentation"]
confidence_threshold: 0.85
auto_correct: true
ON claim_verified:
MARK claim as verified
ON claim_contradicted:
IF correction.confidence > 0.8:
APPLY correction automatically
ELSE:
FLAG for manual review
ON claim_outdated:
APPLY correction with "updated" annotation
ON claim_unverifiable:
FLAG for manual verification
\`\`\`
`,
metadata: {
"type": "verification",
"speed": "medium",
"scope": "item",
"description": "Verify claims against sources of truth with confidence scoring.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
},
'integration-test': {
content: `# Integration Test Loop
Verify that components work together correctly.
**Version**: 1.0.0
**Interface**: loop-interface@1.0
## Classification
- **Type**: verification
- **Speed**: medium-slow (~1-5 minutes)
- **Scope**: collection (multiple components)
## Interface
### Inputs
| Name | Type | Required | Description |
|------|------|----------|-------------|
| components | List<Component> | yes | Components to test together |
| integration_points | List<IntegrationPoint> | no | Specific integrations to verify |
| test_scenarios | List<Scenario> | no | Custom test scenarios |
| environment | Environment | no | Test environment config |
### Outputs
| Name | Type | Description |
|------|------|-------------|
| passed | Boolean | Overall pass/fail |
| results | List<TestResult> | Result for each test |
| failures | List<Failure> | Failed tests with details |
| coverage | IntegrationCoverage | What integrations were tested |
| recommendations | List<Recommendation> | Suggested fixes or improvements |
### State
| Field | Type | Description |
|-------|------|-------------|
| tests_run | List<Test> | Tests executed |
| setup_state | SetupState | Test environment state |
## Types
\`\`\`typescript
type Component = {
name: string
type: "service" | "module" | "api" | "database"
entry_point: string
dependencies: List<string>
}
type IntegrationPoint = {
from: string // Component name
to: string // Component name
type: "api_call" | "event" | "data_flow" | "import"
contract: Contract
}
type Scenario = {
name: string
description: string
steps: List<Step>
expected_outcome: Outcome
cleanup: List<Action>
}
type TestResult = {
test_name: string
passed: boolean
duration_ms: number
output: string
error?: Error
}
\`\`\`
## OODA Phases
### OBSERVE
Analyze integration landscape:
\`\`\`
1. MAP component dependencies:
dependency_graph = {}
FOR component in components:
dependencies = analyze_dependencies(component)
dependency_graph[component.name] = dependencies
# Detect integration points if not provided
IF not integration_points:
integration_points = infer_integration_points(
components,
dependency_graph
)
2. IDENTIFY test scenarios:
IF test_scenarios:
scenarios = test_scenarios
ELSE:
scenarios = []
# Generate scenarios from integration points
FOR point in integration_points:
scenarios.append(generate_scenario(point))
# Add cross-cutting scenarios
scenarios.extend([
happy_path_scenario(components),
error_propagation_scenario(components),
concurrent_access_scenario(components)
])
3. ASSESS environment:
IF environment:
env_ready = verify_environment(environment)
ELSE:
# Detect or create test environment
environment = detect_test_environment()
IF not environment:
environment = create_isolated_environment(components)
IF not env_ready:
env_issues = diagnose_environment(environment)
4. LOAD existing tests:
existing_tests = find_integration_tests(components)
FOR test in existing_tests:
relevant_scenarios = match_to_scenarios(test, scenarios)
IF relevant_scenarios:
test.covers = relevant_scenarios
tests_available.append(test)
SIGNALS:
integration_points: all integrations to test
scenarios: test scenarios to run
environment: test environment status
existing_tests: relevant tests found
\`\`\`
### ORIENT
Plan test execution:
\`\`\`
1. ORDER tests by dependency:
# Test foundation components first
test_order = topological_sort(
scenarios,
by=lambda s: dependency_depth(s.components)
)
2. GROUP related tests:
test_groups = {
"unit_integration": [], # Single integration point
"flow_integration": [], # Multi-step flows
"stress_integration": [], # Concurrent/load tests
"error_handling": [] # Error scenarios
}
FOR scenario in scenarios:
group = classify_scenario(scenario)
test_groups[group].append(scenario)
3. ESTIMATE resources:
FOR scenario in scenarios:
scenario.estimated_duration = estimate_duration(scenario)
scenario.resource_requirements = estimate_resources(scenario)
total_duration = sum(s.estimated_duration for s in scenarios)
IF total_duration > time_budget:
# Prioritize critical paths
scenarios = prioritize_scenarios(scenarios, time_budget)
4. PLAN setup/teardown:
setup_actions = []
teardown_actions = []
FOR component in components:
IF needs_setup(component):
setup_actions.append(generate_setup(component))
IF needs_teardown(component):
teardown_actions.append(generate_teardown(component))
\`\`\`
### DECIDE
Commit to test execution strategy:
\`\`\`
1. VALIDATE prerequisites:
blockers = []
IF not environment.ready:
blockers.append("Environment not ready")
FOR component in components:
IF not component.accessible:
blockers.append(f"{component.name} not accessible")
IF blockers:
decision = "BLOCK"
rationale = blockers
2. SELECT execution mode:
IF len(scenarios) < 10:
execution_mode = "sequential"
ELIF all(s.independent for s in scenarios):
execution_mode = "parallel"
ELSE:
execution_mode = "mixed"
# Sequential for dependent, parallel for independent
3. SET failure handling:
failure_strategy = {
"fail_fast": False, # Continue on failure
"retry_count": 1, # Retry flaky tests once
"quarantine": [], # Known flaky tests
"critical_paths": [...] # Must pass
}
\`\`\`
### ACT
Execute integration tests:
\`\`\`
1. SETUP environment:
FOR action in setup_actions:
result = execute_setup(action)
IF not result.success:
EMIT setup_failed { action, error: result.error }
RETURN { passed: False, failures: [setup_failure] }
setup_state = capture_state(environment)
2. RUN tests:
results = []
failures = []
IF execution_mode == "sequential":
FOR scenario in test_order:
result = run_scenario(scenario)
results.append(result)
tests_run.append(scenario)
IF not result.passed:
failures.append({
scenario: scenario.name,
error: result.error,
output: result.output,
duration: result.duration_ms
})
IF scenario in failure_strategy.critical_paths:
IF failure_strategy.fail_fast:
BREAK
ELIF execution_mode == "parallel":
# Run independent tests concurrently
results = parallel_execute(scenarios, max_concurrency=4)
failures = filter(results, passed == False)
ELIF execution_mode == "mixed":
# Execute in waves
FOR wave in partition_by_dependencies(scenarios):
wave_results = parallel_execute(wave)
results.extend(wave_results)
failures.extend(filter(wave_results, passed == False))
3. ANALYZE failures:
FOR failure in failures:
failure.analysis = analyze_failure(
scenario: failure.scenario,
error: failure.error,
logs: collect_logs(failure.scenario.components)
)
failure.root_cause = identify_root_cause(failure.analysis)
IF can_suggest_fix(failure.root_cause):
recommendations.append({
failure: failure.scenario,
fix: generate_fix_suggestion(failure.root_cause)
})
4. TEARDOWN:
FOR action in teardown_actions:
execute_teardown(action)
restore_state(setup_state)
5. CALCULATE coverage:
coverage = {
integration_points_tested: len(set(
point for scenario in tests_run
for point in scenario.integration_points
)),
integration_points_total: len(integration_points),
coverage_percentage: tested / total,
untested: [
point for point in integration_points
if point not in tested_points
]
}
6. EMIT results:
passed = len(failures) == 0
IF passed:
EMIT integration_passed {
tests_run: len(tests_run),
duration_ms: total_duration,
coverage: coverage.coverage_percentage
}
ELSE:
EMIT integration_failed {
tests_run: len(tests_run),
failures: len(failures),
critical_failures: count_critical(failures)
}
7. RETURN:
RETURN {
passed: passed,
results: results,
failures: failures,
coverage: coverage,
recommendations: recommendations
}
\`\`\`
## Termination Conditions
- **Success**: All tests pass
- **Failure**: Any test fails (configurable with fail_fast)
- **Timeout**: Test execution exceeds time budget
## Composition
### Can contain (nested loops)
- None (executes external tests)
### Can be contained by
- \`orchestration/queue-processor\` (milestone verification)
- \`verification/acceptance-gate\` (as verification step)
- Workflow commands (\`/spec-orchestrator\`)
### Parallelizable
- Conditional: Independent test scenarios can run in parallel
- No: Tests with shared state must be sequential
## Signals Emitted
| Signal | When | Payload |
|--------|------|---------|
| \`setup_started\` | Environment setup begins | \`{ components }\` |
| \`setup_failed\` | Setup fails | \`{ action, error }\` |
| \`test_started\` | Individual test starts | \`{ scenario }\` |
| \`test_passed\` | Individual test passes | \`{ scenario, duration }\` |
| \`test_failed\` | Individual test fails | \`{ scenario, error }\` |
| \`integration_passed\` | All tests pass | \`{ tests_run, coverage }\` |
| \`integration_failed\` | Tests fail | \`{ failures, critical }\` |
## Scenario Templates
### API Integration Scenario
\`\`\`yaml
name: "User API → Auth Service Integration"
steps:
- action: "POST /api/users/login"
with:
email: "test@example.com"
password: "test123"
- verify:
status: 200
body.token: exists
- action: "GET /api/users/me"
with:
headers:
Authorization: "Bearer \${previous.body.token}"
- verify:
status: 200
body.email: "test@example.com"
cleanup:
- action: "DELETE test user"
\`\`\`
### Event Flow Scenario
\`\`\`yaml
name: "Order → Payment → Notification Flow"
steps:
- action: "Create order"
- wait_for_event: "OrderCreated"
- verify: "Payment service received order"
- action: "Complete payment"
- wait_for_event: "PaymentCompleted"
- verify: "Notification sent to user"
expected_duration_ms: 5000
\`\`\`
## Example Usage
\`\`\`markdown
## Integration Verification
Execute @loops/verification/integration-test.md with:
INPUT:
components: [auth_service, user_api, notification_service]
integration_points: discovered_integrations
environment: { type: "docker", config: "docker-compose.test.yml" }
ON setup_failed:
HALT with environment error
ON test_failed:
LOG failure details
IF failure.critical:
HALT workflow
ON integration_passed:
PROCEED to next phase
ON integration_failed:
FOR each recommendation:
ATTEMPT fix
RE-RUN failed tests
\`\`\`
`,
metadata: {
"type": "verification",
"speed": "medium-slow",
"scope": "collection",
"description": "Verify that components work together correctly.",
"interface_version": "loop-interface@1.0",
"inputs": [],
"outputs": [],
"signals": [],
"composition": {}
},
}
}
} as const;
/**
* Get all available categories
*/
export function getCategories(): string[] {
return Object.keys(LOOPS_CATALOG);
}
/**
* Get all loops in a category
*/
export function getLoopsInCategory(category: string): string[] {
const cat = LOOPS_CATALOG[category];
if (!cat) {
throw new Error(`Category not found: ${category}. Available: ${getCategories().join(", ")}`);
}
return Object.keys(cat);
}
/**
* Get a specific loop by category and name
*/
export function getLoop(category: string, name: string): Loop {
const cat = LOOPS_CATALOG[category];
if (!cat) {
throw new Error(`Category not found: ${category}. Available: ${getCategories().join(", ")}`);
}
const loop = cat[name];
if (!loop) {
throw new Error(`Loop not found: ${name} in category ${category}. Available: ${Object.keys(cat).join(", ")}`);
}
return loop;
}