"""
Session State Management for Phase-Gated Execution.
v1.0: Code Intelligence MCP Server
- Phase-gated execution: EXPLORATION → SEMANTIC → VERIFICATION → READY
- Server-side confidence calculation (no LLM self-reporting)
- QueryFrame for structured natural language processing
- ChromaDB-based semantic search (Forest/Map architecture)
v1.1: Context-Aware Guardrails
- Relaxed requirements for markup files (.html, .css, .blade.php, etc.)
- Conditional removal of find_references/find_definitions requirements
- Risk mitigation for missing trigger_condition
"""
import json
import os
import subprocess
import tempfile
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Literal
import yaml
# =============================================================================
# v1.10: Removed Context-Aware Functions
# =============================================================================
# The following functions were REMOVED in v1.10 (no backward compatibility):
# - extract_extensions_from_text(): LLM decides instead of file extension detection
# - is_markup_context(): LLM decides phase necessity instead of automatic relaxation
# - MARKUP_EXTENSIONS: No longer needed (LLM judges based on actual content)
# - assess_risk_level() (in query_frame.py): Moved to LLM judgment via check_phase_necessity
# - get_exploration_requirements() (in query_frame.py): Replaced by individual phase checks
# v1.10 philosophy: Let LLM see actual code and decide necessity for each phase,
# rather than pre-judging based on file extensions or QueryFrame content.
class Phase(Enum):
"""Execution phases in order (v1.11: unified submit_phase)."""
BRANCH_INTERVENTION = auto() # Step 2: stale branch intervention (v1.11)
DOCUMENT_RESEARCH = auto() # Step 3: design document research (v1.11)
QUERY_FRAME = auto() # Step 4: NL → structured slot extraction (v1.11)
EXPLORATION = auto() # Step 5: code-intel exploration
Q1 = auto() # Step 6: SEMANTIC necessity evaluation (v1.11)
SEMANTIC = auto() # Step 7: semantic search (if Q1=YES)
Q2 = auto() # Step 8: VERIFICATION necessity evaluation (v1.11)
VERIFICATION = auto() # Step 9: verify hypotheses (if Q2=YES)
Q3 = auto() # Step 10: IMPACT_ANALYSIS necessity evaluation (v1.11)
IMPACT_ANALYSIS = auto() # Step 11: impact analysis (if Q3=YES)
READY = auto() # Steps 12-14: task planning, implementation, completion
POST_IMPL_VERIFY = auto() # Step 15: post-implementation verification (v1.11)
VERIFY_INTERVENTION = auto() # Step 16: intervention for 3x failures (v1.11)
PRE_COMMIT = auto() # Step 17: garbage detection before commit
QUALITY_REVIEW = auto() # Step 18: quality check before merge
MERGE = auto() # Step 19: merge to base (v1.11)
class SemanticReason(Enum):
"""
Reasons for entering SEMANTIC phase (semantic search).
Must be one of these - no free text allowed.
"""
NO_DEFINITION_FOUND = "no_definition_found"
NO_REFERENCE_FOUND = "no_reference_found"
NO_SIMILAR_IMPLEMENTATION = "no_similar_implementation"
ARCHITECTURE_UNKNOWN = "architecture_unknown"
CONTEXT_FRAGMENTED = "context_fragmented"
class IntentReclassificationRequired(Exception):
"""Raised when Intent needs to be re-evaluated before Write."""
pass
class InvalidSemanticReason(Exception):
"""Raised when semantic_reason doesn't match missing_requirements."""
pass
class WriteTargetBlocked(Exception):
"""Raised when Write target was not explored."""
pass
# =============================================================================
# Exploration Evaluation (Server-side confidence calculation)
# =============================================================================
# Minimum requirements for EXPLORATION to be considered "high" confidence
MIN_EXPLORATION_REQUIREMENTS = {
"symbols_identified": 2,
"entry_points": 1,
"files_analyzed": 2,
"required_tools": {"find_definitions", "find_references"},
}
# Strict requirements for IMPLEMENT/MODIFY to proceed to READY
STRICT_EXPLORATION_REQUIREMENTS = {
"symbols_identified": 3,
"entry_points": 1,
"files_analyzed": 2,
"required_tools": {"find_definitions", "find_references"},
"existing_patterns": 1,
}
# =============================================================================
# Gate Level (v1.10: Simplified to 2 levels)
# =============================================================================
# Gate level determines whether to check phase necessity
# v1.10: Removed GATE_LEVEL_REQUIREMENTS - LLM decides phase necessity instead
# Valid values: "full" (execute all phases) or "auto" (check before each phase)
# v1.10 note: The following constants are REMOVED in v1.10:
# - GATE_LEVEL_REQUIREMENTS (5 levels → 2 levels)
# - is_markup_context() (LLM decides instead of file extension)
# - extract_extensions_from_text() (no longer needed)
# - assess_risk_level() (moved to LLM judgment)
# - get_exploration_requirements() (no longer needed)
# Old MIN/STRICT requirements kept for compatibility with evaluate_exploration_v36
# (will be simplified in future cleanup)
MIN_EXPLORATION_REQUIREMENTS = {
"symbols_identified": 1,
"entry_points": 0,
"files_analyzed": 1,
"existing_patterns": 0,
"required_tools": {"find_definitions"},
}
# Strict requirements for IMPLEMENT/MODIFY to proceed to READY
STRICT_EXPLORATION_REQUIREMENTS = {
"symbols_identified": 3,
"entry_points": 1,
"files_analyzed": 2,
"required_tools": {"find_definitions", "find_references"},
"existing_patterns": 1,
}
def evaluate_exploration(
result: "ExplorationResult",
intent: str,
gate_level: str = "high",
) -> tuple[str, list[str]]:
"""
Calculate confidence on server side.
Mechanically determined from artifacts, not LLM self-reporting.
Args:
result: Exploration result
intent: IMPLEMENT, MODIFY, INVESTIGATE, QUESTION
gate_level: v1.2 gate level (high, middle, low, auto, none)
"none" should not reach here (skipped in create_session)
Returns:
(confidence, missing_requirements)
"""
missing = []
tools_used = set(result.tools_used)
# v1.10: Gate level simplified to "full" or "auto"
# Use intent-based requirements
if intent in ("IMPLEMENT", "MODIFY"):
reqs = STRICT_EXPLORATION_REQUIREMENTS
else:
reqs = MIN_EXPLORATION_REQUIREMENTS
# Check each requirement
if len(result.symbols_identified) < reqs["symbols_identified"]:
missing.append(f"symbols_identified: {len(result.symbols_identified)}/{reqs['symbols_identified']}")
if reqs.get("entry_points", 0) > 0:
if len(result.entry_points) < reqs["entry_points"]:
missing.append(f"entry_points: {len(result.entry_points)}/{reqs['entry_points']}")
if len(result.files_analyzed) < reqs["files_analyzed"]:
missing.append(f"files_analyzed: {len(result.files_analyzed)}/{reqs['files_analyzed']}")
if not reqs["required_tools"].issubset(tools_used):
missing_tools = reqs["required_tools"] - tools_used
missing.append(f"required_tools: missing {missing_tools}")
# Check existing_patterns if required
if reqs.get("existing_patterns", 0) > 0:
if len(result.existing_patterns) < reqs["existing_patterns"]:
missing.append(f"existing_patterns: {len(result.existing_patterns)}/{reqs['existing_patterns']}")
confidence = "high" if not missing else "low"
return confidence, missing
def can_proceed_to_ready(
result: "ExplorationResult",
intent: str,
gate_level: str = "high",
) -> tuple[bool, list[str]]:
"""
IMPLEMENT/MODIFY cannot proceed to READY without meeting minimum requirements.
Args:
result: Exploration result
intent: IMPLEMENT, MODIFY, INVESTIGATE, QUESTION
gate_level: v1.2 gate level (high, middle, low, auto, none)
Returns:
(can_proceed, missing_requirements)
"""
# gate_level="none" should not reach here (already at READY)
if gate_level == "none":
return True, []
if intent not in ("IMPLEMENT", "MODIFY"):
# INVESTIGATE can proceed with minimal exploration
return True, []
confidence, missing = evaluate_exploration(result, intent, gate_level)
return confidence == "high", missing
# =============================================================================
# Dynamic Exploration Requirements based on risk_level
# =============================================================================
# v1.10: get_dynamic_requirements() removed - inlined into evaluate_exploration_v36()
def evaluate_exploration_v36(
result: "ExplorationResult",
intent: str,
risk_level: str = "LOW",
query_frame: "QueryFrame | None" = None,
gate_level: str = "auto",
) -> tuple[str, list[str]]:
"""
Evaluate results considering risk level.
v1.1: Relaxed requirements for markup-only files.
v1.2: gate_level allows explicit specification of requirement level.
Args:
result: Exploration result
intent: IMPLEMENT, MODIFY, INVESTIGATE, QUESTION
risk_level: HIGH, MEDIUM, LOW
query_frame: QueryFrame (for slot evidence check)
gate_level: v1.2 gate level (high, middle, low, auto, none)
"auto" uses risk_level for determination
Returns:
(confidence, missing_requirements)
"""
missing = []
tools_used = set(result.tools_used)
# v1.10: gate_level is either "full" or "auto"
# Use requirements based on risk_level (expanded from get_dynamic_requirements())
if intent not in ("IMPLEMENT", "MODIFY"):
reqs = {
"symbols_identified": 1,
"entry_points": 0,
"files_analyzed": 1,
"existing_patterns": 0,
"required_slot_evidence": [],
}
elif risk_level == "HIGH":
reqs = {
"symbols_identified": 5,
"entry_points": 2,
"files_analyzed": 4,
"existing_patterns": 2,
"required_slot_evidence": ["target_feature", "observed_issue"],
}
elif risk_level == "MEDIUM":
reqs = {
"symbols_identified": 3,
"entry_points": 1,
"files_analyzed": 2,
"existing_patterns": 1,
"required_slot_evidence": ["target_feature"],
}
else: # LOW
reqs = {
"symbols_identified": 3,
"entry_points": 1,
"files_analyzed": 2,
"existing_patterns": 1,
"required_slot_evidence": [],
}
# v1.10: Removed auto-relaxation based on markup detection
# LLM judges via check_phase_necessity, so pre-determination by file extension is unnecessary
# Basic checks
if len(result.symbols_identified) < reqs["symbols_identified"]:
missing.append(f"symbols_identified: {len(result.symbols_identified)}/{reqs['symbols_identified']}")
if len(result.entry_points) < reqs["entry_points"]:
missing.append(f"entry_points: {len(result.entry_points)}/{reqs['entry_points']}")
if len(result.files_analyzed) < reqs["files_analyzed"]:
missing.append(f"files_analyzed: {len(result.files_analyzed)}/{reqs['files_analyzed']}")
if len(result.existing_patterns) < reqs.get("existing_patterns", 0):
missing.append(f"existing_patterns: {len(result.existing_patterns)}/{reqs.get('existing_patterns', 0)}")
# v1.10: Required tools check (removed markup detection relaxation)
if "required_tools" in reqs:
# Use required tools specified in requirements
required_tools = reqs["required_tools"]
else:
# Fallback: basic requirements
required_tools = {"find_definitions", "find_references"}
if not required_tools.issubset(tools_used):
missing_tools = required_tools - tools_used
missing.append(f"required_tools: missing {missing_tools}")
# v3.6: スロット証拠チェック
if query_frame and reqs.get("required_slot_evidence"):
for slot in reqs["required_slot_evidence"]:
if slot not in query_frame.slot_evidence:
missing.append(f"slot_evidence: {slot} not evidenced")
confidence = "high" if not missing else "low"
return confidence, missing
# =============================================================================
# Exploration Consistency Check (semantic consistency, not just quantity)
# =============================================================================
def validate_exploration_consistency(result: "ExplorationResult") -> list[str]:
"""
成果物の相互整合性をチェック。
LLM が「形式的には条件を満たすが意味のない探索」をすることを防ぐ。
Returns:
List of consistency errors (empty if consistent)
"""
errors = []
# entry_points は symbols_identified に含まれているか
# entry_point は通常 "SymbolName.method()" や "function()" の形式
for ep in result.entry_points:
# メソッド名を抽出("Class.method()" → "Class" or "method")
base_name = ep.split(".")[0].split("(")[0].strip()
if not any(
base_name in sym or sym in base_name
for sym in result.symbols_identified
):
errors.append(f"entry_point '{ep}' not linked to any symbol in symbols_identified")
# existing_patterns が files_analyzed に紐づいているか
if result.existing_patterns and not result.files_analyzed:
errors.append("patterns provided but no files analyzed")
# symbols_identified に重複がないか(水増し防止)
unique_symbols = set(result.symbols_identified)
if len(unique_symbols) < len(result.symbols_identified):
errors.append(f"duplicate symbols detected: {len(result.symbols_identified)} given, {len(unique_symbols)} unique")
# files_analyzed に重複がないか(水増し防止)
unique_files = set(result.files_analyzed)
if len(unique_files) < len(result.files_analyzed):
errors.append(f"duplicate files detected: {len(result.files_analyzed)} given, {len(unique_files)} unique")
return errors
# =============================================================================
# SEMANTIC Reason Validation (探索失敗の種類に対応)
# =============================================================================
# missing_requirements のキーに対応する許可される semantic_reason
SEMANTIC_ALLOWED_REASONS_BY_MISSING = {
"symbols_identified": {
SemanticReason.NO_DEFINITION_FOUND,
SemanticReason.ARCHITECTURE_UNKNOWN,
},
"entry_points": {
SemanticReason.NO_DEFINITION_FOUND,
SemanticReason.NO_REFERENCE_FOUND,
},
"existing_patterns": {
SemanticReason.NO_SIMILAR_IMPLEMENTATION,
SemanticReason.ARCHITECTURE_UNKNOWN,
},
"files_analyzed": {
SemanticReason.CONTEXT_FRAGMENTED,
SemanticReason.ARCHITECTURE_UNKNOWN,
},
# required_tools: ツール未使用は semantic の理由にはならない(ツールを使えば済む)
# ただし使っても見つからない場合は他の理由が適用される
"required_tools": set(),
}
def validate_semantic_reason(
missing_requirements: list[str],
semantic_reason: SemanticReason,
) -> tuple[bool, str]:
"""
semantic_reason が missing_requirements に対応しているかチェック。
「探索をサボる口実として SEMANTIC に逃げる」ことを防ぐ。
Returns:
(is_valid, error_message)
"""
if not missing_requirements:
# missing がないのに SEMANTIC に来た場合(理論上ありえないが)
return False, "No missing requirements but entered SEMANTIC phase"
# missing_requirements から許可される reason を収集
allowed_reasons: set[SemanticReason] = set()
for missing in missing_requirements:
# "symbols_identified: 1/3" → "symbols_identified"
key = missing.split(":")[0].strip()
allowed_reasons |= SEMANTIC_ALLOWED_REASONS_BY_MISSING.get(key, set())
# CONTEXT_FRAGMENTED と ARCHITECTURE_UNKNOWN は汎用的に許可
allowed_reasons.add(SemanticReason.CONTEXT_FRAGMENTED)
allowed_reasons.add(SemanticReason.ARCHITECTURE_UNKNOWN)
if semantic_reason not in allowed_reasons:
return False, (
f"semantic_reason '{semantic_reason.value}' is not allowed for missing: {missing_requirements}. "
f"Allowed reasons: {[r.value for r in allowed_reasons]}"
)
return True, ""
# =============================================================================
# Write Target Validation (探索済み範囲に制限)
# =============================================================================
def validate_write_target(
file_path: str,
explored_files: set[str],
allow_new_files: bool = True,
) -> tuple[bool, str]:
"""
Write 対象が探索済みかチェック。
「見てないコードを書くな」を物理化。
Args:
file_path: 書き込み対象のファイルパス
explored_files: 探索済みファイルのセット
allow_new_files: 新規ファイル作成を許可するか
Returns:
(is_valid, error_message)
"""
import os
# 新規ファイル作成の場合
if not os.path.exists(file_path):
if allow_new_files:
# 新規ファイルは許可(ただし親ディレクトリが探索済みか確認)
parent_dir = os.path.dirname(file_path)
explored_dirs = {os.path.dirname(f) for f in explored_files}
if parent_dir in explored_dirs or not explored_dirs:
return True, ""
return False, (
f"New file '{file_path}' is in unexplored directory. "
f"Explored directories: {explored_dirs}"
)
return False, f"New file creation not allowed: {file_path}"
# 既存ファイルの場合、探索済みか確認
# パスの正規化(相対/絶対パスの差異を吸収)
normalized_path = os.path.normpath(file_path)
normalized_explored = {os.path.normpath(f) for f in explored_files}
if normalized_path in normalized_explored:
return True, ""
# ファイル名のみでも一致を確認(パスの書き方が異なる場合の救済)
file_name = os.path.basename(normalized_path)
explored_names = {os.path.basename(f) for f in explored_files}
if file_name in explored_names:
return True, ""
return False, (
f"File '{file_path}' was not explored. "
f"Run EXPLORATION first or add to files_analyzed. "
f"Explored files: {explored_files}"
)
# =============================================================================
# v1.11: Task Model for Task Orchestration (READY phase)
# =============================================================================
@dataclass
@dataclass
class ChecklistItem:
"""
v1.16: Represents a single implementation item within a task.
status:
- pending: Initial state
- done: Completed (evidence required)
- skipped: Not implemented (reason required)
"""
item: str
status: str = "pending" # pending, done, skipped
evidence: str | None = None # Required when status="done", format: "file.py:42" or "file.py:42-58"
reason: str | None = None # Required when status="skipped", min 10 chars
def to_dict(self) -> dict:
d = {
"item": self.item,
"status": self.status,
}
if self.evidence:
d["evidence"] = self.evidence
if self.reason:
d["reason"] = self.reason
return d
@classmethod
def from_dict(cls, data: dict) -> "ChecklistItem":
return cls(
item=data.get("item", ""),
status=data.get("status", "pending"),
evidence=data.get("evidence"),
reason=data.get("reason"),
)
@dataclass
class TaskModel:
"""
v1.11: Represents a single implementation task within READY phase.
v1.16: Added checklist for tracking individual implementation items.
failure_count is per-task (not session-level).
revert_reason records why this task was added during revert.
checklist: List of specific implementation items to track completion.
"""
id: str
description: str
status: str = "pending" # pending, completed
failure_count: int = 0
revert_reason: str | None = None
checklist: list[ChecklistItem] = field(default_factory=list)
def to_dict(self) -> dict:
d = {
"id": self.id,
"description": self.description,
"status": self.status,
}
if self.failure_count > 0:
d["failure_count"] = self.failure_count
if self.revert_reason:
d["revert_reason"] = self.revert_reason
if self.checklist:
d["checklist"] = [item.to_dict() for item in self.checklist]
return d
@classmethod
def from_dict(cls, data: dict) -> "TaskModel":
checklist = []
if "checklist" in data:
checklist = [ChecklistItem.from_dict(item) for item in data["checklist"]]
return cls(
id=data.get("id", ""),
description=data.get("description", ""),
status=data.get("status", "pending"),
failure_count=data.get("failure_count", 0),
revert_reason=data.get("revert_reason"),
checklist=checklist,
)
# =============================================================================
# v1.11: Expected Payload Definitions
# =============================================================================
# Phase → step number mapping
PHASE_STEP_MAP: dict[str, int] = {
"BRANCH_INTERVENTION": 2,
"DOCUMENT_RESEARCH": 3,
"QUERY_FRAME": 4,
"EXPLORATION": 5,
"Q1": 6,
"SEMANTIC": 7,
"Q2": 8,
"VERIFICATION": 9,
"Q3": 10,
"IMPACT_ANALYSIS": 11,
"READY": 12, # Planning step (may be 13/14 depending on sub-step)
"POST_IMPL_VERIFY": 15,
"VERIFY_INTERVENTION": 16,
"PRE_COMMIT": 17,
"QUALITY_REVIEW": 18,
"MERGE": 19,
}
_PHASE_CONTRACT_CACHE: dict[str, dict] = {}
_PHASE_CONTRACT_MTIME: dict[str, float] = {}
_FULL_CONTRACT_CACHE: dict[str, dict] = {}
_FULL_CONTRACT_MTIME: dict[str, float] = {}
def _normalize_phase_contract(data: dict) -> dict[str, dict]:
if not isinstance(data, dict):
return {}
phases = data.get("phases")
if isinstance(phases, dict):
data = phases
normalized: dict[str, dict] = {}
for key, value in data.items():
if isinstance(value, dict):
normalized[str(key)] = value
return normalized
class PhaseContractMissingError(Exception):
"""phase_contract.yml が見つからない場合のエラー"""
pass
def _get_mcp_server_root() -> Path:
"""MCP サーバーのルートディレクトリを取得"""
# tools/session.py → llm-helper/
return Path(__file__).parent.parent
def _load_phase_contract() -> dict[str, dict]:
"""
MCP サーバーの .code-intel/phase_contract.yml を読み込む。
phase_contract.yml はMCPサーバーの設定であり、
対象プロジェクト(repo_path)ではなくサーバー自身のディレクトリから読む。
Raises:
PhaseContractMissingError: phase_contract.yml が見つからない場合
"""
mcp_root = _get_mcp_server_root()
contract_path = mcp_root / ".code-intel" / "phase_contract.yml"
if not contract_path.exists():
raise PhaseContractMissingError(
f"phase_contract.yml not found at {contract_path}. "
f"Please run init-project.sh or copy from templates/code-intel/"
)
cache_key = str(contract_path)
try:
mtime = contract_path.stat().st_mtime
except OSError as e:
raise PhaseContractMissingError(f"Cannot read phase_contract.yml: {e}")
cached = _PHASE_CONTRACT_CACHE.get(cache_key)
cached_mtime = _PHASE_CONTRACT_MTIME.get(cache_key)
if cached is not None and cached_mtime == mtime:
return cached
try:
raw = contract_path.read_text(encoding="utf-8")
data = yaml.safe_load(raw)
except Exception as e:
raise PhaseContractMissingError(f"Failed to parse phase_contract.yml: {e}")
normalized = _normalize_phase_contract(data if data else {})
_PHASE_CONTRACT_CACHE[cache_key] = normalized
_PHASE_CONTRACT_MTIME[cache_key] = mtime
return normalized
def _load_full_phase_contract() -> dict:
"""
v1.15: MCP サーバーの .code-intel/phase_contract.yml を完全に読み込む。
_load_phase_contract() と異なり、phases セクションだけでなく
failures, common_failures, success, tool_errors, warnings 等の
全セクションを含む完全な YAML データを返す。
Returns:
Complete YAML data as dict
Raises:
PhaseContractMissingError: phase_contract.yml が見つからない場合
"""
mcp_root = _get_mcp_server_root()
contract_path = mcp_root / ".code-intel" / "phase_contract.yml"
if not contract_path.exists():
raise PhaseContractMissingError(
f"phase_contract.yml not found at {contract_path}. "
f"Please run init-project.sh or copy from templates/code-intel/"
)
cache_key = str(contract_path)
try:
mtime = contract_path.stat().st_mtime
except OSError as e:
raise PhaseContractMissingError(f"Cannot read phase_contract.yml: {e}")
cached = _FULL_CONTRACT_CACHE.get(cache_key)
cached_mtime = _FULL_CONTRACT_MTIME.get(cache_key)
if cached is not None and cached_mtime == mtime:
return cached
try:
raw = contract_path.read_text(encoding="utf-8")
data = yaml.safe_load(raw)
except Exception as e:
raise PhaseContractMissingError(f"Failed to parse phase_contract.yml: {e}")
_FULL_CONTRACT_CACHE[cache_key] = data if data else {}
_FULL_CONTRACT_MTIME[cache_key] = mtime
return data if data else {}
def _get_message(
category: str,
phase_or_tool: str | None,
key: str,
**kwargs,
) -> dict:
"""
v1.15: phase_contract.yml からメッセージを取得する。
Args:
category: "failures", "common_failures", "success", "tool_errors",
"session_messages", "hints", "query_frame_hints", "warnings"
phase_or_tool: フェーズ名またはツール名 (common_failures 等では None)
key: メッセージキー
**kwargs: メッセージ内のプレースホルダー置換用
例: _get_message("failures", "READY", "unknown_task", task_id="task1")
Returns:
{"error": str, "message": str, ...} 形式の dict
見つからない場合は {"error": "message_not_found", "message": key}
"""
try:
contract = _load_full_phase_contract()
except PhaseContractMissingError:
return {"error": "contract_not_loaded", "message": key}
section = contract.get(category, {})
if not section:
return {"error": "section_not_found", "message": key}
# common_failures, session_messages, hints, warnings は phase_or_tool 不要
if phase_or_tool:
phase_section = section.get(phase_or_tool, {})
message_def = phase_section.get(key, {})
else:
message_def = section.get(key, {})
if not message_def:
return {"error": "message_not_found", "message": key}
# メッセージ定義をコピーして返す
result = {}
if "error" in message_def:
result["error"] = message_def["error"]
if "message" in message_def:
msg = message_def["message"]
# プレースホルダー置換
if kwargs:
try:
msg = msg.format(**kwargs)
except (KeyError, ValueError):
pass # フォーマット失敗時は元のメッセージを使用
result["message"] = msg
if "instruction" in message_def:
result["instruction"] = message_def["instruction"]
if "requires_user_intervention" in message_def:
result["requires_user_intervention"] = message_def["requires_user_intervention"]
return result
def get_phase_response(
phase_name: str,
step: int | None = None,
extra: dict | None = None,
) -> dict:
"""
Generate self-contained response for a phase.
Every response includes instruction + expected_payload so the LLM
always knows what to do next, even after context compaction.
Raises:
PhaseContractMissingError: phase_contract.yml が見つからない場合
"""
contract = _load_phase_contract()
if step is None:
step = PHASE_STEP_MAP.get(phase_name, 0)
# Determine the expected payload key
payload_key = phase_name
if phase_name == "READY":
# READY has sub-steps; caller should use READY_PLAN, READY_IMPL, or READY_COMPLETE
payload_key = extra.get("ready_substep", "READY_PLAN") if extra else "READY_PLAN"
# Get phase config from contract (no fallback - phase_contract.yml must be complete)
phase_config = contract.get(payload_key) or contract.get(phase_name)
if not phase_config:
raise ValueError(f"Phase '{phase_name}' (payload_key='{payload_key}') not found in phase_contract.yml")
instruction = phase_config.get("instruction")
if not instruction:
raise ValueError(f"Phase '{phase_name}' missing 'instruction' in phase_contract.yml")
expected_payload = phase_config.get("expected_payload")
if expected_payload is None:
raise ValueError(f"Phase '{phase_name}' missing 'expected_payload' in phase_contract.yml")
response = {
"phase": phase_name,
"step": step,
"instruction": instruction,
"expected_payload": expected_payload,
"call": "submit_phase",
}
if extra:
for k, v in extra.items():
if k != "ready_substep":
response[k] = v
# v1.13: Append compaction protocol note to instruction
compaction_note = (
"\n[compaction_protocol] Include compaction_count (integer) in submit_phase data. "
"Send the same compaction_count from the previous submit_phase response. "
"If context compaction occurred and you cannot access the previous response, "
"send last known compaction_count + 1 (use 1 if unknown). "
"If response contains phase_summaries, use them to restore context from previous phases."
)
response["instruction"] = response["instruction"] + compaction_note
return response
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class ExplorationResult:
"""
Phase 1 output: what was learned from code-intel.
confidence はサーバー側で算出する(LLM の自己申告は不可)。
"""
symbols_identified: list[str] = field(default_factory=list)
entry_points: list[str] = field(default_factory=list)
existing_patterns: list[str] = field(default_factory=list)
files_analyzed: list[str] = field(default_factory=list)
tools_used: list[str] = field(default_factory=list)
notes: str = ""
# v3.3: Server-calculated confidence (not from LLM)
_evaluated_confidence: str = field(default="", init=False)
_missing_requirements: list[str] = field(default_factory=list, init=False)
def to_dict(self) -> dict:
return {
"symbols_identified": self.symbols_identified,
"entry_points": self.entry_points,
"existing_patterns": self.existing_patterns,
"files_analyzed": self.files_analyzed,
"tools_used": self.tools_used,
"notes": self.notes,
"evaluated_confidence": self._evaluated_confidence,
"missing_requirements": self._missing_requirements,
}
@dataclass
class Hypothesis:
"""
構造化された仮説。
改善サイクルで「低 confidence の仮説は失敗しやすい」等の分析が可能。
"""
text: str
confidence: Literal["high", "medium", "low"] = "medium"
def to_dict(self) -> dict:
return {
"text": self.text,
"confidence": self.confidence,
}
@dataclass
class SemanticResult:
"""
Phase 2 output: hypotheses from semantic search.
semantic_reason は SemanticReason Enum のみ許可。
"""
hypotheses: list[Hypothesis] = field(default_factory=list)
semantic_reason: SemanticReason | None = None
search_queries: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"hypotheses": [h.to_dict() for h in self.hypotheses],
"semantic_reason": self.semantic_reason.value if self.semantic_reason else None,
"search_queries": self.search_queries,
}
@dataclass
class VerificationEvidence:
"""
構造化された evidence。
裏取りに使用したツール・対象・結果を必須化。
"""
tool: str # 使用したツール(find_definitions, find_references, etc.)
target: str # 検証対象(シンボル名、ファイル名など)
result: str # ツールの結果概要
files: list[str] = field(default_factory=list) # 関連ファイル
def to_dict(self) -> dict:
return {
"tool": self.tool,
"target": self.target,
"result": self.result,
"files": self.files,
}
@dataclass
class VerifiedHypothesis:
"""
検証済み仮説。
evidence を構造化することで「裏取りしたフリ」を防止。
"""
hypothesis: str
status: Literal["confirmed", "rejected"]
evidence: VerificationEvidence
def to_dict(self) -> dict:
return {
"hypothesis": self.hypothesis,
"status": self.status,
"evidence": self.evidence.to_dict(),
}
@dataclass
class VerificationResult:
"""
Phase 3 output: verified hypotheses.
verified は VerifiedHypothesis のリスト。
"""
verified: list[VerifiedHypothesis] = field(default_factory=list)
all_confirmed: bool = False
def to_dict(self) -> dict:
return {
"verified": [v.to_dict() for v in self.verified],
"all_confirmed": self.all_confirmed,
}
# =============================================================================
# v1.1: Impact Analysis Result
# =============================================================================
@dataclass
class VerifiedFile:
"""A file that was verified during impact analysis."""
file: str
status: str # will_modify, no_change_needed, not_affected
reason: str | None = None
def to_dict(self) -> dict:
return {
"file": self.file,
"status": self.status,
"reason": self.reason,
}
@dataclass
class ImpactAnalysisResult:
"""Result of impact analysis phase."""
target_files: list[str] = field(default_factory=list)
must_verify: list[str] = field(default_factory=list)
should_verify: list[str] = field(default_factory=list)
verified_files: list[VerifiedFile] = field(default_factory=list)
inferred_from_rules: list[str] = field(default_factory=list)
mode: str = "standard" # standard or relaxed_markup
def to_dict(self) -> dict:
return {
"target_files": self.target_files,
"must_verify": self.must_verify,
"should_verify": self.should_verify,
"verified_files": [v.to_dict() for v in self.verified_files],
"inferred_from_rules": self.inferred_from_rules,
"mode": self.mode,
}
# =============================================================================
# v1.2: Pre-Commit Review Result (Garbage Detection)
# =============================================================================
@dataclass
class ReviewedFile:
"""A file reviewed during PRE_COMMIT phase."""
path: str
decision: str # keep, discard
reason: str | None = None # Required if discard
change_type: str = "modified" # added, modified, deleted
def to_dict(self) -> dict:
return {
"path": self.path,
"decision": self.decision,
"reason": self.reason,
"change_type": self.change_type,
}
@dataclass
class PreCommitReviewResult:
"""Result of PRE_COMMIT garbage detection phase."""
total_changes: int = 0
reviewed_files: list[ReviewedFile] = field(default_factory=list)
kept_files: list[str] = field(default_factory=list)
discarded_files: list[str] = field(default_factory=list)
review_notes: str = ""
def to_dict(self) -> dict:
return {
"total_changes": self.total_changes,
"reviewed_files": [f.to_dict() for f in self.reviewed_files],
"kept_files": self.kept_files,
"discarded_files": self.discarded_files,
"review_notes": self.review_notes,
}
# =============================================================================
# Valid tools for verification evidence
# =============================================================================
VALID_VERIFICATION_TOOLS = {
"find_definitions",
"find_references",
"search_text",
"analyze_structure",
"query",
}
def validate_verification_evidence(evidence: VerificationEvidence) -> tuple[bool, str]:
"""
evidence が有効かチェック。
Returns:
(is_valid, error_message)
"""
if evidence.tool not in VALID_VERIFICATION_TOOLS:
return False, f"Invalid tool '{evidence.tool}'. Must be one of: {VALID_VERIFICATION_TOOLS}"
if not evidence.target:
return False, "evidence.target is required"
if not evidence.result:
return False, "evidence.result is required"
return True, ""
# =============================================================================
# Session State
# =============================================================================
@dataclass
class SessionState:
"""
Manages the state of a code implementation session.
Features:
- Phase-gated execution (EXPLORATION → SEMANTIC → VERIFICATION → READY)
- Server-side confidence calculation
- QueryFrame for natural language structuring
- Forest/Map dual search results
- Write target validation
Enforces phase-gated execution:
- Tools are restricted based on current phase
- Phase transitions require specific outputs
- Prevents LLM from skipping steps
"""
# Session identity
session_id: str
intent: str # IMPLEMENT, MODIFY, INVESTIGATE, QUESTION
query: str
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
# v3.8: Project path for agreements and learned_pairs
repo_path: str = "."
# v3.5: Decision Log for Outcome matching
decision_log: dict | None = None
# v3.6: QueryFrame for natural language structuring
query_frame: "QueryFrame | None" = None
risk_level: str = "LOW" # HIGH, MEDIUM, LOW
# Current phase
phase: Phase = Phase.EXPLORATION
# Phase outputs
exploration: ExplorationResult | None = None
semantic: SemanticResult | None = None
verification: VerificationResult | None = None
impact_analysis: ImpactAnalysisResult | None = None # v1.1
pre_commit_review: PreCommitReviewResult | None = None # v1.2
# v1.2: Task branch management (renamed from overlay in v1.2.1)
task_branch_name: str | None = None # Git branch name (llm_task_{session_id}_from_{base})
task_branch_enabled: bool = False # Whether task branch is active
branch_policy: str | None = None # v1.13: branch handling policy (e.g., "continue")
# v1.10: Gate level for phase necessity checks (simplified to 2 levels)
_gate_level: str = field(default="auto", init=False) # full, auto
# Semantic search results (Forest/Map architecture)
map_results: list[dict] = field(default_factory=list) # 地図検索結果
forest_results: list[dict] = field(default_factory=list) # 森検索結果
map_hit: bool = False # 地図でヒットしたか(Short-circuit用)
# Tracking
tool_calls: list[dict] = field(default_factory=list)
phase_history: list[dict] = field(default_factory=list)
# v1.4: Intervention System
verification_failure_count: int = 0 # POST_IMPLEMENTATION_VERIFICATION failures
intervention_count: int = 0 # Number of interventions triggered
failure_history: list[dict] = field(default_factory=list) # History of failures for analysis
# v1.5: Quality Review
quality_revert_count: int = 0 # Number of reverts from QUALITY_REVIEW to READY
quality_review_enabled: bool = True # Whether quality review is enabled (--no-quality sets this to False)
quality_review_max_revert: int = 3 # Max revert count before forced completion
quality_review_completed: bool = False # Whether quality review passed (issues_found=false)
# v1.8: PRE_COMMIT + QUALITY_REVIEW Order Change
commit_prepared: bool = False # Whether commit is prepared (waiting for quality review)
prepared_commit_message: str | None = None # Commit message for prepared commit
prepared_kept_files: list[str] = field(default_factory=list) # Files to keep for prepared commit
prepared_discarded_files: list[str] = field(default_factory=list) # Files to discard for prepared commit
# v1.8: Only Explore Mode
skip_implementation: bool = False # Whether to skip implementation phase (--only-explore sets this to True)
# v1.10: Phase Necessity Assessments
phase_assessments: dict[str, dict] = field(default_factory=dict) # Record of Q1/Q2/Q3 check results
# v1.11: Task Orchestration (READY sub-steps)
tasks: list[TaskModel] = field(default_factory=list)
ready_substep: str = "plan" # plan, implement, complete
# v1.13: Summary-only payload persistence
phase_payloads: dict[str, dict] = field(default_factory=dict)
# v1.13: Compaction resilience counter
compaction_count: int = 0
# v1.11: Session flags (parsed from command options by LLM)
no_verify: bool = False # --no-verify
no_quality: bool = False # --no-quality (alias for quality_review_enabled=False)
fast_mode: bool = False # --fast
quick_mode: bool = False # --quick
no_doc: bool = False # --no-doc
no_intervention: bool = False # --no-intervention / -ni
# v1.7: Ctags Performance Optimization
definitions_cache: dict[tuple[str, str, str | None, bool], dict] = field(default_factory=dict)
cache_stats: dict[str, int] = field(default_factory=lambda: {"hits": 0, "misses": 0})
@property
def gate_level(self) -> str:
"""Get gate level for phase necessity checks (v1.10: full or auto)."""
return self._gate_level
@gate_level.setter
def gate_level(self, value: str) -> None:
"""
Set gate level for phase necessity checks (v1.10: full or auto).
Args:
value: "full" (execute all phases) or "auto" (check before each phase)
Raises:
ValueError: If value is not "full" or "auto"
"""
if value not in ("full", "auto", "none"):
raise ValueError(f"gate_level must be 'full', 'auto', or 'none', got '{value}'")
self._gate_level = value
def record_tool_call_start(self, tool: str, params: dict) -> None:
"""
Record tool call start (v1.8: Performance tracking).
Args:
tool: Tool name
params: Tool parameters
"""
now = datetime.now()
record = {
"tool": tool,
"params": params,
"phase": self.phase.name,
"started_at": now.isoformat(),
# completed_at will be added by record_tool_call_end
}
self.tool_calls.append(record)
def record_tool_call_end(
self,
result_summary: str,
result_detail: dict | None = None,
) -> None:
"""
Record tool call completion (v1.8: Performance tracking).
Args:
result_summary: Summary of result
result_detail: Detailed result (optional)
"""
if not self.tool_calls:
return
now = datetime.now()
last_call = self.tool_calls[-1]
last_call["completed_at"] = now.isoformat()
last_call["result_summary"] = result_summary
# Calculate execution time
if "started_at" in last_call:
started = datetime.fromisoformat(last_call["started_at"])
last_call["duration_seconds"] = (now - started).total_seconds()
if result_detail:
last_call["result_detail"] = result_detail
def record_tool_call(
self,
tool: str,
params: dict,
result_summary: str,
result_detail: dict | None = None,
) -> None:
"""
Record a tool call for tracking (legacy method for backward compatibility).
DEPRECATED: Use record_tool_call_start/end instead for performance tracking.
result_detail: 改善サイクルでの分析用。
"""
record = {
"tool": tool,
"params": params,
"result_summary": result_summary,
"phase": self.phase.name,
"timestamp": datetime.now().isoformat(),
}
# v3.5: 詳細結果があれば追加
if result_detail:
record["result_detail"] = result_detail
self.tool_calls.append(record)
def transition_to_phase(self, new_phase: Phase, reason: str = "") -> None:
"""
Transition to a new phase with timestamp tracking (v1.8: Performance tracking).
Args:
new_phase: The phase to transition to
reason: Reason for transition (e.g., "submit_understanding", "gate_requirement")
"""
now = datetime.now()
# Record end time for current phase
if self.phase_history:
last_phase = self.phase_history[-1]
if "ended_at" not in last_phase:
last_phase["ended_at"] = now.isoformat()
if "started_at" in last_phase:
started = datetime.fromisoformat(last_phase["started_at"])
last_phase["duration_seconds"] = (now - started).total_seconds()
# Transition to new phase
old_phase = self.phase
self.phase = new_phase
# Record new phase start
self.phase_history.append({
"phase": new_phase.name,
"started_at": now.isoformat(),
"reason": reason,
"from_phase": old_phase.name,
# ended_at will be added on next transition
})
def get_allowed_tools(self) -> list[str]:
"""
Get tools allowed in current phase.
Semantic Search Rule をコードで実装。
"""
if self.phase == Phase.EXPLORATION:
# semantic_search は明示的に除外
return [
"query", "search_text", "find_definitions",
"find_references", "analyze_structure", "get_symbols",
"get_function_at_line", "search_files",
"submit_exploration", # v1.10: submit_understanding removed
]
elif self.phase == Phase.SEMANTIC:
return [
"semantic_search",
"submit_semantic",
"search_text", "find_definitions", "find_references",
]
elif self.phase == Phase.VERIFICATION:
# semantic_search は明示的に除外
return [
"query", "search_text", "find_definitions",
"find_references", "analyze_structure",
"submit_verification",
]
elif self.phase == Phase.IMPACT_ANALYSIS:
# v1.1: analyze_impact と探索ツールを許可
return [
"analyze_impact",
"submit_impact_analysis",
"query", "search_text", "find_definitions",
"find_references", "analyze_structure",
]
elif self.phase == Phase.READY:
# v1.2: READY allows all tools except commit-related
return [
"*", # All exploration/implementation tools
"submit_for_review", # Transition to PRE_COMMIT
]
elif self.phase == Phase.PRE_COMMIT:
# v1.2: Only review and finalize tools
return [
"review_changes", # Get changes for review
"finalize_changes", # Apply reviewed changes
"merge_to_base", # Merge to base branch
]
return []
def is_tool_allowed(self, tool: str) -> bool:
"""Check if a tool is allowed in current phase."""
allowed = self.get_allowed_tools()
if "*" in allowed:
return True
return tool in allowed
def get_blocked_reason(self, tool: str) -> str:
"""Get reason why a tool is blocked."""
if self.phase == Phase.EXPLORATION:
if tool == "semantic_search":
return (
"semantic_search is not allowed in EXPLORATION phase. "
"First use code-intel tools (query, find_definitions, find_references). "
"Then call submit_exploration and check_phase_necessity(phase='SEMANTIC') to determine if semantic search is needed."
)
elif self.phase == Phase.VERIFICATION:
if tool == "semantic_search":
return (
"semantic_search is not allowed in VERIFICATION phase. "
"Use code-intel tools to verify hypotheses from semantic search."
)
return f"Tool '{tool}' is not allowed in phase {self.phase.name}"
def check_intent_before_write(self) -> None:
"""
READY フェーズで Write が要求された場合、Intent を確認。
INVESTIGATE のまま Write しようとしたら例外。
"""
if self.intent == "INVESTIGATE":
raise IntentReclassificationRequired(
"Intent is INVESTIGATE but Write was requested. "
"Re-classify intent to IMPLEMENT or MODIFY first."
)
def check_write_target(self, file_path: str, allow_new_files: bool = True) -> dict:
"""
Write 対象が探索済みかチェック。
「見てないコードを書くな」を物理化。
Args:
file_path: 書き込み対象のファイルパス
allow_new_files: 新規ファイル作成を許可するか
Returns:
{"allowed": bool, "error": str | None}
"""
if self.phase != Phase.READY:
return {
"allowed": False,
"error": f"Write not allowed in phase {self.phase.name}",
}
# 探索済みファイルを取得
explored_files: set[str] = set()
if self.exploration:
explored_files = set(self.exploration.files_analyzed)
# 検証で触れたファイルも追加
if self.verification:
for vh in self.verification.verified:
explored_files.update(vh.evidence.files)
# ファイルがない場合(QUESTION intent など)は制限なし
if not explored_files:
return {"allowed": True, "error": None}
is_valid, error = validate_write_target(file_path, explored_files, allow_new_files)
if is_valid:
return {"allowed": True, "error": None}
return {
"allowed": False,
"error": error,
"explored_files": list(explored_files),
"hint": "Add the file to exploration first, or run additional exploration.",
"recovery_options": {
"add_explored_files": {
"description": "Add files/directories to explored list without leaving READY phase",
"example": "session.add_explored_files(['tests_with_code/'])",
},
},
}
def add_explored_files(self, files: list[str]) -> dict:
"""
READYフェーズで探索済みファイルを追加登録。
check_write_target でブロックされた場合の軽量な復帰手段。
新しいディレクトリやファイルを探索済みとして追加できる。
Args:
files: 追加する探索済みファイル/ディレクトリのリスト
Returns:
{"success": bool, "added": list, "explored_files": list}
"""
if self.phase != Phase.READY:
return {
"success": False,
"error": f"add_explored_files is only allowed in READY phase, current: {self.phase.name}",
}
if not files:
return {
"success": False,
"error": "No files provided to add",
}
# exploration が None の場合は初期化
if self.exploration is None:
self.exploration = ExplorationResult()
added = []
for f in files:
if f not in self.exploration.files_analyzed:
self.exploration.files_analyzed.append(f)
added.append(f)
return {
"success": True,
"added": added,
"explored_files": self.exploration.files_analyzed,
"message": f"Added {len(added)} file(s) to explored list.",
}
# v1.15: submit_exploration, _validate_nl_symbol_mapping, _check_hypothesis_slots removed
# (integrated into submit_phase via code_intel_server._submit_exploration_v11)
# v1.15: submit_semantic removed (integrated into submit_phase)
def submit_verification(self, result: VerificationResult) -> dict:
"""
Submit verification results and move to ready.
evidence は構造化必須。
Returns: {"success": bool, "next_phase": str, "message": str}
"""
if self.phase != Phase.VERIFICATION:
return {
"success": False,
"next_phase": self.phase.name,
"message": f"Cannot submit verification in phase {self.phase.name}",
}
if not result.verified:
return {
"success": False,
"next_phase": self.phase.name,
"message": "Must verify at least one hypothesis.",
}
# v3.3: evidence の検証
for vh in result.verified:
is_valid, error = validate_verification_evidence(vh.evidence)
if not is_valid:
return {
"success": False,
"next_phase": self.phase.name,
"message": f"Invalid evidence for hypothesis '{vh.hypothesis}': {error}",
"valid_tools": list(VALID_VERIFICATION_TOOLS),
}
self.verification = result
# Record phase transition
self.phase_history.append({
"from": Phase.VERIFICATION.name,
"result": result.to_dict(),
"timestamp": datetime.now().isoformat(),
})
# v1.1: READY ではなく IMPACT_ANALYSIS へ
self.transition_to_phase(Phase.IMPACT_ANALYSIS, reason="submit_verification")
rejected = [v for v in result.verified if v.status == "rejected"]
if rejected:
return {
"success": True,
"next_phase": Phase.IMPACT_ANALYSIS.name,
"message": "Verification complete. Proceed to impact analysis.",
"warning": f"{len(rejected)} hypotheses were rejected. Do NOT implement based on rejected hypotheses.",
"rejected": [r.to_dict() for r in rejected],
"next_step": "Call analyze_impact with target files, then submit_impact_analysis with verified_files.",
}
return {
"success": True,
"next_phase": Phase.IMPACT_ANALYSIS.name,
"message": "All hypotheses verified. Proceed to impact analysis before implementation.",
"next_step": "Call analyze_impact with target files, then submit_impact_analysis with verified_files.",
}
def set_impact_analysis_context(
self,
target_files: list[str],
must_verify: list[str],
should_verify: list[str],
mode: str = "standard",
) -> None:
"""
v1.1: Store analyze_impact result for validation in submit_impact_analysis.
Called by analyze_impact tool handler.
"""
self.impact_analysis = ImpactAnalysisResult(
target_files=target_files,
must_verify=must_verify,
should_verify=should_verify,
mode=mode,
)
def submit_impact_analysis(
self,
verified_files: list[dict],
inferred_from_rules: list[str] | None = None,
) -> dict:
"""
v1.1: Submit impact analysis results and move to READY.
Validates that all must_verify files have responses.
Returns: {"success": bool, "next_phase": str, "message": str}
"""
if self.phase != Phase.IMPACT_ANALYSIS:
return {
"success": False,
"next_phase": self.phase.name,
"message": f"Cannot submit impact analysis in phase {self.phase.name}",
}
if self.impact_analysis is None:
return {
"success": False,
"next_phase": self.phase.name,
"message": "Must call analyze_impact before submit_impact_analysis.",
}
# Convert verified_files to VerifiedFile objects
verified = []
for vf in verified_files:
verified.append(VerifiedFile(
file=vf["file"],
status=vf["status"],
reason=vf.get("reason"),
))
# Validate: all must_verify files must have a response
verified_paths = {v.file for v in verified}
missing_must_verify = []
for must_file in self.impact_analysis.must_verify:
if must_file not in verified_paths:
missing_must_verify.append(must_file)
if missing_must_verify:
return {
"success": False,
"next_phase": self.phase.name,
"message": "Not all must_verify files have been verified.",
"missing_must_verify": missing_must_verify,
"hint": "Provide status for all must_verify files before proceeding.",
}
# Validate: status != will_modify requires reason
missing_reasons = []
for v in verified:
if v.status != "will_modify" and not v.reason:
missing_reasons.append(v.file)
if missing_reasons:
return {
"success": False,
"next_phase": self.phase.name,
"message": "Files with status != will_modify require a reason.",
"missing_reasons": missing_reasons,
}
# Update impact_analysis result
self.impact_analysis.verified_files = verified
self.impact_analysis.inferred_from_rules = inferred_from_rules or []
# Record phase transition
self.phase_history.append({
"from": Phase.IMPACT_ANALYSIS.name,
"result": self.impact_analysis.to_dict(),
"timestamp": datetime.now().isoformat(),
})
# v1.8: If skip_implementation is enabled, do not transition to READY
if self.skip_implementation:
# Keep phase as IMPACT_ANALYSIS (or could transition to a terminal phase if needed)
# Return exploration complete message
response = {
"success": True,
"next_phase": Phase.IMPACT_ANALYSIS.name,
"exploration_complete": True,
"message": "Exploration complete. Implementation skipped (--only-explore mode).",
"verified_count": len(verified),
"will_modify": [v.file for v in verified if v.status == "will_modify"],
}
# Check for should_verify warnings
should_verify_missing = []
for should_file in self.impact_analysis.should_verify:
if should_file not in verified_paths:
should_verify_missing.append(should_file)
if should_verify_missing:
response["warning"] = f"Some should_verify files were not verified: {should_verify_missing}"
return response
# Normal flow: transition to READY
self.transition_to_phase(Phase.READY, reason="submit_impact_analysis")
# Check for should_verify warnings
should_verify_missing = []
for should_file in self.impact_analysis.should_verify:
if should_file not in verified_paths:
should_verify_missing.append(should_file)
response = {
"success": True,
"next_phase": Phase.READY.name,
"message": "Impact analysis complete. Ready for implementation.",
"verified_count": len(verified),
"will_modify": [v.file for v in verified if v.status == "will_modify"],
}
if should_verify_missing:
response["warning"] = f"Some should_verify files were not verified: {should_verify_missing}"
return response
# v1.10: submit_verification_and_impact removed (v1.9 integration reverted)
# Use separate submit_verification and submit_impact_analysis instead
# v1.15: submit_for_review removed (integrated into submit_phase)
def submit_pre_commit_review(
self,
reviewed_files: list[dict],
review_notes: str = "",
) -> dict:
"""
v1.2: Submit garbage detection review results.
This validates that all changes have been reviewed.
Args:
reviewed_files: List of {"path": str, "decision": "keep"|"discard", "reason": str}
review_notes: Optional notes about the review
Returns: {"success": bool, "kept_files": list, "discarded_files": list}
"""
if self.phase != Phase.PRE_COMMIT:
return {
"success": False,
"message": f"Cannot submit review in phase {self.phase.name}. Must be in PRE_COMMIT phase.",
}
# Convert to ReviewedFile objects
reviewed = []
kept = []
discarded = []
for rf in reviewed_files:
decision = rf.get("decision", "keep")
path = rf["path"]
# Validate: discard requires reason
if decision == "discard" and not rf.get("reason"):
return {
"success": False,
"message": f"Discarding '{path}' requires a reason.",
}
reviewed.append(ReviewedFile(
path=path,
decision=decision,
reason=rf.get("reason"),
change_type=rf.get("change_type", "modified"),
))
if decision == "keep":
kept.append(path)
else:
discarded.append(path)
# Store result
self.pre_commit_review = PreCommitReviewResult(
total_changes=len(reviewed),
reviewed_files=reviewed,
kept_files=kept,
discarded_files=discarded,
review_notes=review_notes,
)
# Record phase transition
self.phase_history.append({
"from": Phase.PRE_COMMIT.name,
"result": self.pre_commit_review.to_dict(),
"timestamp": datetime.now().isoformat(),
})
return {
"success": True,
"kept_files": kept,
"discarded_files": discarded,
"message": f"Review complete. {len(kept)} files to keep, {len(discarded)} files to discard.",
"next_step": "Call finalize_changes to apply decisions and commit.",
}
def get_status(self) -> dict:
"""
Get current session status (v1.11: includes expected_payload for compaction resilience).
The response is self-contained: LLM can recover from compaction by reading this.
"""
phase_name = self.phase.name
step = PHASE_STEP_MAP.get(phase_name, 0)
# Determine the correct instruction/expected_payload key
ready_substep = None
if phase_name == "READY":
ready_substep = self._get_ready_substep_key()
step = {"plan": 12, "implement": 13, "complete": 14}.get(self.ready_substep, 12)
# Build completed steps list
completed_steps = self._get_completed_steps()
phase_response = get_phase_response(
phase_name,
step=step,
extra={"ready_substep": ready_substep} if ready_substep else None,
)
status = {
"session_id": self.session_id,
"intent": self.intent,
"phase": phase_name,
"step": step,
"completed_steps": completed_steps,
"instruction": phase_response.get("instruction"),
"expected_payload": phase_response.get("expected_payload"),
"task_progress": self._get_task_progress() if self.tasks else None,
"compaction_count": self.compaction_count,
}
# v1.2: Add task branch info if enabled
if self.task_branch_enabled:
status["task_branch"] = {
"enabled": True,
"branch": self.task_branch_name,
}
return status
def _get_ready_substep_key(self) -> str:
"""Get the expected payload key for READY sub-step."""
if self.ready_substep == "plan" or not self.tasks:
return "READY_PLAN"
elif self.ready_substep == "implement":
return "READY_IMPL"
else:
return "READY_COMPLETE"
def record_phase_summary(self, payload_key: str, step: int, summary: str) -> None:
"""Record summary-only payload for checkpoint persistence."""
if not isinstance(summary, str):
return
key = self._format_phase_payload_key(payload_key, step)
self.phase_payloads[key] = {"summary": summary.strip()}
def _format_phase_payload_key(self, payload_key: str, step: int) -> str:
label_map = {
"READY_PLAN": "READY_PLANNING",
"READY_IMPL": "READY_IMPLEMENTATION",
"READY_COMPLETE": "READY_COMPLETION",
}
label = label_map.get(payload_key, payload_key)
return f"step_{step:02d}_{label}"
def get_phase_summaries_flat(self) -> dict[str, str]:
"""Return phase_payloads as flat dict {key: summary_text} for compaction recovery."""
result = {}
for key, payload in self.phase_payloads.items():
if isinstance(payload, dict) and "summary" in payload:
result[key] = payload["summary"]
return result
def _get_completed_steps(self) -> list[int]:
"""Get list of completed step numbers from phase history."""
completed = []
seen_phases = set()
for entry in self.phase_history:
phase_name = entry.get("phase") or entry.get("from")
if phase_name and phase_name not in seen_phases:
seen_phases.add(phase_name)
step = PHASE_STEP_MAP.get(phase_name)
if step:
completed.append(step)
return sorted(completed)
def _get_task_progress(self) -> dict | None:
"""Get task progress summary."""
if not self.tasks:
return None
total = len(self.tasks)
completed = sum(1 for t in self.tasks if t.status == "completed")
pending = sum(1 for t in self.tasks if t.status == "pending")
next_task = None
for t in self.tasks:
if t.status == "pending":
next_task = t.to_dict()
break
return {
"total": total,
"completed": completed,
"pending": pending,
"next_task": next_task,
"tasks": [t.to_dict() for t in self.tasks],
}
# =========================================================================
# v1.11: Task Orchestration Methods (READY phase sub-steps)
# =========================================================================
def register_tasks(self, tasks_data: list[dict]) -> dict:
"""
v1.11 Step 12: Register task list (idempotent).
Accepts full task list. Replaces any existing tasks.
Works for both initial registration and revert re-planning.
"""
if self.phase != Phase.READY:
return {
"error": "phase_mismatch",
"current_phase": self.phase.name,
"message": "Task registration is only allowed in READY phase.",
}
if not tasks_data:
return {
"error": "empty_tasks",
"message": "At least one task is required.",
}
# Check for duplicate IDs
ids = [t.get("id") for t in tasks_data]
if len(ids) != len(set(ids)):
return {
"error": "duplicate_task_ids",
"message": "Task IDs must be unique.",
}
# Check that at least one task is pending
has_pending = any(t.get("status", "pending") == "pending" for t in tasks_data)
if not has_pending:
return {
"error": "no_pending_tasks",
"message": "At least one task must have status 'pending'.",
}
# Build task list
self.tasks = []
for t in tasks_data:
# v1.16: Parse checklist if present
checklist = []
if "checklist" in t:
for item in t["checklist"]:
checklist.append(ChecklistItem(
item=item.get("item", ""),
status=item.get("status", "pending"),
evidence=item.get("evidence"),
reason=item.get("reason"),
))
self.tasks.append(TaskModel(
id=t["id"],
description=t.get("description", ""),
status=t.get("status", "pending"),
failure_count=t.get("failure_count", 0),
revert_reason=t.get("revert_reason"),
checklist=checklist,
))
self.ready_substep = "implement"
# Find next pending task
next_task = None
for t in self.tasks:
if t.status == "pending":
next_task = t
break
return {
"success": True,
"tasks_registered": len(self.tasks),
"pending": sum(1 for t in self.tasks if t.status == "pending"),
"completed": sum(1 for t in self.tasks if t.status == "completed"),
"next_task": next_task.to_dict() if next_task else None,
}
def complete_task(
self,
task_id: str,
summary: str,
checklist: list[dict] | None = None,
) -> dict:
"""
v1.11 Step 13: Report task completion.
v1.16: Added checklist parameter for tracking individual implementation items.
Validates order (must complete in registered order for pending tasks).
Checklist validation is done in code_intel_server.py before calling this.
"""
if self.phase != Phase.READY:
return {
"error": "phase_mismatch",
"current_phase": self.phase.name,
"message": "Task completion only in READY phase.",
}
if not self.tasks:
return {
"error": "no_tasks",
"message": "No tasks registered. Call submit_phase with tasks first.",
}
# Find the task
target = None
for t in self.tasks:
if t.id == task_id:
target = t
break
if target is None:
return {
"error": "unknown_task",
"message": f"Task '{task_id}' not found.",
"registered_tasks": [t.id for t in self.tasks],
}
if target.status == "completed":
return {
"error": "already_completed",
"message": f"Task '{task_id}' is already completed.",
}
# Enforce order: must be the first pending task
first_pending = None
for t in self.tasks:
if t.status == "pending":
first_pending = t
break
if first_pending and first_pending.id != task_id:
return {
"error": "wrong_order",
"message": f"Must complete '{first_pending.id}' before '{task_id}'.",
"expected_task": first_pending.to_dict(),
}
# v1.16: Update checklist with reported status
if checklist:
updated_checklist = []
for reported_item in checklist:
updated_checklist.append(ChecklistItem(
item=reported_item.get("item", ""),
status=reported_item.get("status", "pending"),
evidence=reported_item.get("evidence"),
reason=reported_item.get("reason"),
))
target.checklist = updated_checklist
# Mark as completed
target.status = "completed"
# Check if all complete
all_done = all(t.status == "completed" for t in self.tasks)
if all_done:
self.ready_substep = "complete"
return {
"all_complete": True,
**get_phase_response("READY", step=14, extra={"ready_substep": "READY_COMPLETE"}),
}
# Find next task
next_task = None
for t in self.tasks:
if t.status == "pending":
next_task = t
break
progress = self._get_task_progress()
return {
"success": True,
"progress": progress,
"next_task": next_task.to_dict() if next_task else None,
**get_phase_response("READY", step=13, extra={"ready_substep": "READY_IMPL"}),
}
def check_all_tasks_complete(self) -> dict:
"""
v1.11 Step 14: Check if all tasks are complete before proceeding.
Returns error if tasks are pending or not registered.
"""
if self.phase != Phase.READY:
return {
"error": "phase_mismatch",
"current_phase": self.phase.name,
}
# For IMPLEMENT/MODIFY: tasks must be registered and completed
if self.intent in ("IMPLEMENT", "MODIFY"):
if not self.tasks:
return {
"error": "no_tasks_registered",
"message": "No tasks registered. Submit task plan first.",
}
pending = [t for t in self.tasks if t.status == "pending"]
if pending:
return {
"error": "incomplete_tasks",
"message": f"{len(pending)} task(s) still pending.",
"pending_tasks": [t.to_dict() for t in pending],
}
return {"success": True, "all_complete": True}
# =========================================================================
# v1.4: Intervention System Methods
# =========================================================================
def record_verification_failure(self, failure_info: dict) -> dict:
"""
Record a verification failure for intervention tracking.
Args:
failure_info: {
"phase": str, # Phase where failure occurred (e.g., "POST_IMPLEMENTATION_VERIFICATION")
"error_message": str, # What differed from expectation
"problem_location": str, # Where the problem was found
"observed_values": str, # Actual values observed
"attempt_number": int, # Which attempt this was
}
Returns:
{"recorded": bool, "failure_count": int, "intervention_triggered": bool, "intervention_data": dict | None}
"""
self.verification_failure_count += 1
# Add to failure history
failure_record = {
"count": self.verification_failure_count,
"timestamp": datetime.now().isoformat(),
**failure_info,
}
self.failure_history.append(failure_record)
# Check if intervention should be triggered (threshold = 3)
intervention_triggered = self.verification_failure_count >= 3
result = {
"recorded": True,
"failure_count": self.verification_failure_count,
"intervention_triggered": intervention_triggered,
"intervention_data": None,
}
if intervention_triggered:
result["intervention_data"] = self._get_intervention_data()
return result
def _get_intervention_data(self) -> dict:
"""
Get intervention data when threshold is reached.
Returns data needed for LLM to select and follow intervention prompt.
"""
# Force user_escalation after 2 interventions
force_user_escalation = self.intervention_count >= 2
return {
"failure_count": self.verification_failure_count,
"intervention_count": self.intervention_count,
"force_user_escalation": force_user_escalation,
"failure_history": self.failure_history[-3:], # Last 3 failures
"available_prompts": [
"structure_review", # Layout/positioning issues
"hypothesis_review", # Error messages changing
"step_back", # General stuck state
"user_escalation", # Escalate to user (mandatory after 2 interventions)
],
"prompt_selection_guide": {
"structure_review": "Select when layout/positioning adjustments are repeating",
"hypothesis_review": "Select when error messages change each time",
"step_back": "Select for general stuck state",
"user_escalation": "MANDATORY if intervention_count >= 2, otherwise select when other interventions haven't helped",
},
"instructions": (
"1. Analyze failure_history to understand the pattern\n"
"2. Select appropriate intervention prompt from available_prompts\n"
"3. Read the intervention prompt from .code-intel/interventions/{selected}.md\n"
"4. Follow the instructions in the prompt\n"
"5. If force_user_escalation is true, MUST use user_escalation.md"
),
}
def get_intervention_status(self) -> dict:
"""
Get current intervention system status.
Returns:
Status of verification failures and interventions.
"""
return {
"verification_failure_count": self.verification_failure_count,
"intervention_count": self.intervention_count,
"intervention_threshold": 3,
"force_escalation_threshold": 2,
"near_intervention": self.verification_failure_count >= 2,
"requires_user_escalation": self.intervention_count >= 2,
"failure_history_count": len(self.failure_history),
}
def to_checkpoint_dict(self) -> dict:
"""Minimal serialization for checkpoint persistence (v1.13)."""
step = PHASE_STEP_MAP.get(self.phase.name, 0)
if self.phase == Phase.READY:
step = {"plan": 12, "implement": 13, "complete": 14}.get(self.ready_substep, 12)
return {
"orchestrator_state": {
"session_id": self.session_id,
"intent": self.intent,
"query": self.query,
"repo_path": self.repo_path,
"flags": {
"no_verify": self.no_verify,
"no_quality": self.no_quality,
"fast": self.fast_mode,
"quick": self.quick_mode,
"no_doc": self.no_doc,
"no_intervention": self.no_intervention,
"skip_implementation": self.skip_implementation,
},
"phase_state": {
"current_phase": self.phase.name,
"step": step,
"ready_substep": self.ready_substep,
"gate_level": self._gate_level,
"risk_level": self.risk_level,
},
"branch_policy": self.branch_policy,
"counters": {
"verification_failure_count": self.verification_failure_count,
"quality_revert_count": self.quality_revert_count,
"intervention_count": self.intervention_count,
},
"quality_review": {
"enabled": self.quality_review_enabled,
"completed": self.quality_review_completed,
"max_revert": self.quality_review_max_revert,
},
"commit_state": {
"commit_prepared": self.commit_prepared,
"prepared_commit_message": self.prepared_commit_message,
"prepared_kept_files": self.prepared_kept_files,
"prepared_discarded_files": self.prepared_discarded_files,
},
"tasks": [t.to_dict() for t in self.tasks],
"compaction_count": self.compaction_count,
},
"phase_payloads": self.phase_payloads,
"checkpoint_version": "1.13",
"checkpoint_at": datetime.now().isoformat(),
}
def to_dict(self) -> dict:
"""Full serialization for checkpoint persistence (v1.12)."""
# v1.7: Serialize tuple keys in definitions_cache to JSON arrays
serialized_cache = {}
for key, value in self.definitions_cache.items():
serialized_key = json.dumps([key[0], key[1], key[2], key[3]])
serialized_cache[serialized_key] = value
return {
# Identity
"session_id": self.session_id,
"intent": self.intent,
"query": self.query,
"created_at": self.created_at,
"repo_path": self.repo_path,
# Phase state
"current_phase": self.phase.name,
"ready_substep": self.ready_substep,
"gate_level": self._gate_level,
"risk_level": self.risk_level,
# Decision log
"decision_log": self.decision_log,
# Nested dataclass outputs
"query_frame": self.query_frame.to_dict() if self.query_frame else None,
"exploration": self.exploration.to_dict() if self.exploration else None,
"semantic": self.semantic.to_dict() if self.semantic else None,
"verification": self.verification.to_dict() if self.verification else None,
"impact_analysis": self.impact_analysis.to_dict() if self.impact_analysis else None,
"pre_commit_review": self.pre_commit_review.to_dict() if self.pre_commit_review else None,
# Task branch
"task_branch_enabled": self.task_branch_enabled,
"task_branch_name": self.task_branch_name,
"branch_policy": self.branch_policy,
# Semantic search
"map_results": self.map_results,
"forest_results": self.forest_results,
"map_hit": self.map_hit,
# Tracking
"tool_calls": self.tool_calls,
"phase_history": self.phase_history,
# v1.4: Intervention System
"verification_failure_count": self.verification_failure_count,
"intervention_count": self.intervention_count,
"failure_history": self.failure_history,
# v1.5: Quality Review
"quality_revert_count": self.quality_revert_count,
"quality_review_enabled": self.quality_review_enabled,
"quality_review_max_revert": self.quality_review_max_revert,
"quality_review_completed": self.quality_review_completed,
# v1.8: PRE_COMMIT
"commit_prepared": self.commit_prepared,
"prepared_commit_message": self.prepared_commit_message,
"prepared_kept_files": self.prepared_kept_files,
"prepared_discarded_files": self.prepared_discarded_files,
# v1.8: Only Explore
"skip_implementation": self.skip_implementation,
# v1.10: Phase assessments
"phase_assessments": self.phase_assessments,
# v1.11: Tasks
"tasks": [t.to_dict() for t in self.tasks],
# v1.13: Summary-only payload persistence
"phase_payloads": self.phase_payloads,
# v1.13: Compaction resilience counter
"compaction_count": self.compaction_count,
# v1.11: Flags
"no_verify": self.no_verify,
"no_quality": self.no_quality,
"fast_mode": self.fast_mode,
"quick_mode": self.quick_mode,
"no_doc": self.no_doc,
"no_intervention": self.no_intervention,
# v1.7: Definitions cache (tuple keys serialized as JSON arrays)
"definitions_cache": serialized_cache,
"cache_stats": self.cache_stats,
# v1.12: Checkpoint metadata
"checkpoint_version": "1.12",
"checkpoint_at": datetime.now().isoformat(),
}
@classmethod
def from_dict(cls, data: dict) -> "SessionState":
"""Restore SessionState from checkpoint dict (v1.12)."""
session = cls(
session_id=data.get("session_id", "unknown"),
intent=data.get("intent", "INVESTIGATE"),
query=data.get("query", ""),
repo_path=data.get("repo_path", "."),
)
session.created_at = data.get("created_at", session.created_at)
# Phase
phase_name = data.get("current_phase", "EXPLORATION")
try:
session.phase = Phase[phase_name]
except KeyError:
session.phase = Phase.EXPLORATION
# Phase state
session.ready_substep = data.get("ready_substep", "plan")
gate_level = data.get("gate_level", "auto")
if gate_level in ("full", "auto", "none"):
session._gate_level = gate_level
else:
session._gate_level = "auto"
session.risk_level = data.get("risk_level", "LOW")
# Decision log
session.decision_log = data.get("decision_log")
# Nested dataclasses
if data.get("query_frame"):
session.query_frame = _restore_query_frame(data["query_frame"])
if data.get("exploration"):
session.exploration = _restore_exploration(data["exploration"])
if data.get("semantic"):
session.semantic = _restore_semantic(data["semantic"])
if data.get("verification"):
session.verification = _restore_verification(data["verification"])
if data.get("impact_analysis"):
session.impact_analysis = _restore_impact_analysis(data["impact_analysis"])
if data.get("pre_commit_review"):
session.pre_commit_review = _restore_pre_commit_review(data["pre_commit_review"])
# Task branch
session.task_branch_enabled = data.get("task_branch_enabled", False)
session.task_branch_name = data.get("task_branch_name")
session.branch_policy = data.get("branch_policy")
# Semantic search
session.map_results = data.get("map_results", [])
session.forest_results = data.get("forest_results", [])
session.map_hit = data.get("map_hit", False)
# Tracking
session.tool_calls = data.get("tool_calls", [])
session.phase_history = data.get("phase_history", [])
# v1.4: Intervention
session.verification_failure_count = data.get("verification_failure_count", 0)
session.intervention_count = data.get("intervention_count", 0)
session.failure_history = data.get("failure_history", [])
# v1.5: Quality Review
session.quality_revert_count = data.get("quality_revert_count", 0)
session.quality_review_enabled = data.get("quality_review_enabled", True)
session.quality_review_max_revert = data.get("quality_review_max_revert", 3)
session.quality_review_completed = data.get("quality_review_completed", False)
# v1.8: PRE_COMMIT
session.commit_prepared = data.get("commit_prepared", False)
session.prepared_commit_message = data.get("prepared_commit_message")
session.prepared_kept_files = data.get("prepared_kept_files", [])
session.prepared_discarded_files = data.get("prepared_discarded_files", [])
# v1.8: Only Explore
session.skip_implementation = data.get("skip_implementation", False)
# v1.10: Phase assessments
session.phase_assessments = data.get("phase_assessments", {})
# v1.11: Tasks
tasks_data = data.get("tasks", [])
session.tasks = []
for td in tasks_data:
session.tasks.append(TaskModel(
id=td.get("id", ""),
description=td.get("description", ""),
status=td.get("status", "pending"),
failure_count=td.get("failure_count", 0),
revert_reason=td.get("revert_reason"),
))
# v1.13: Summary-only payload persistence
session.phase_payloads = data.get("phase_payloads", {})
# v1.13: Compaction resilience counter
session.compaction_count = data.get("compaction_count", 0)
# v1.11: Flags
session.no_verify = data.get("no_verify", False)
session.no_quality = data.get("no_quality", False)
session.fast_mode = data.get("fast_mode", False)
session.quick_mode = data.get("quick_mode", False)
session.no_doc = data.get("no_doc", False)
session.no_intervention = data.get("no_intervention", False)
# v1.7: Definitions cache (deserialize JSON array keys back to tuples)
raw_cache = data.get("definitions_cache", {})
session.definitions_cache = {}
for key_str, value in raw_cache.items():
try:
parts = json.loads(key_str)
cache_key = (parts[0], parts[1], parts[2], parts[3])
session.definitions_cache[cache_key] = value
except (json.JSONDecodeError, IndexError, KeyError):
pass
session.cache_stats = data.get("cache_stats", {"hits": 0, "misses": 0})
return session
@classmethod
def from_checkpoint(cls, data: dict) -> "SessionState":
"""Restore SessionState from v1.13 minimal checkpoint (fallback to v1.12)."""
orchestrator_state = data.get("orchestrator_state")
if not isinstance(orchestrator_state, dict):
return cls.from_dict(data)
session = cls(
session_id=orchestrator_state.get("session_id", "unknown"),
intent=orchestrator_state.get("intent", "INVESTIGATE"),
query=orchestrator_state.get("query", ""),
repo_path=orchestrator_state.get("repo_path", "."),
)
flags = orchestrator_state.get("flags", {})
if isinstance(flags, dict):
session.no_verify = flags.get("no_verify", False)
session.no_quality = flags.get("no_quality", False)
session.fast_mode = flags.get("fast", False)
session.quick_mode = flags.get("quick", False)
session.no_doc = flags.get("no_doc", False)
session.no_intervention = flags.get("no_intervention", False)
session.skip_implementation = flags.get("skip_implementation", False)
session.quality_review_enabled = not session.no_quality
phase_state = orchestrator_state.get("phase_state", {})
if isinstance(phase_state, dict):
phase_name = phase_state.get("current_phase", "EXPLORATION")
try:
session.phase = Phase[phase_name]
except KeyError:
session.phase = Phase.EXPLORATION
session.ready_substep = phase_state.get("ready_substep", "plan")
gate_level = phase_state.get("gate_level", "auto")
session._gate_level = gate_level if gate_level in ("full", "auto", "none") else "auto"
session.risk_level = phase_state.get("risk_level", "LOW")
session.branch_policy = orchestrator_state.get("branch_policy")
counters = orchestrator_state.get("counters", {})
if isinstance(counters, dict):
session.verification_failure_count = counters.get("verification_failure_count", 0)
session.quality_revert_count = counters.get("quality_revert_count", 0)
session.intervention_count = counters.get("intervention_count", 0)
quality_review = orchestrator_state.get("quality_review", {})
if isinstance(quality_review, dict):
session.quality_review_enabled = quality_review.get("enabled", True)
session.quality_review_completed = quality_review.get("completed", False)
session.quality_review_max_revert = quality_review.get("max_revert", 3)
commit_state = orchestrator_state.get("commit_state", {})
if isinstance(commit_state, dict):
session.commit_prepared = commit_state.get("commit_prepared", False)
session.prepared_commit_message = commit_state.get("prepared_commit_message")
session.prepared_kept_files = commit_state.get("prepared_kept_files", [])
session.prepared_discarded_files = commit_state.get("prepared_discarded_files", [])
tasks_data = orchestrator_state.get("tasks", [])
session.tasks = []
if isinstance(tasks_data, list):
for td in tasks_data:
if not isinstance(td, dict):
continue
session.tasks.append(TaskModel(
id=td.get("id", ""),
description=td.get("description", ""),
status=td.get("status", "pending"),
failure_count=td.get("failure_count", 0),
revert_reason=td.get("revert_reason"),
))
session.phase_payloads = data.get("phase_payloads", {})
# v1.13: Compaction resilience counter
session.compaction_count = orchestrator_state.get("compaction_count", 0)
return session
# =============================================================================
# v1.12: Checkpoint Restore Helpers
# =============================================================================
def _restore_query_frame(data: dict) -> "QueryFrame":
"""Restore QueryFrame from dict."""
from tools.query_frame import QueryFrame, MappedSymbol, SlotSource, SlotEvidence
qf = QueryFrame(raw_query=data.get("raw_query", ""))
qf.target_feature = data.get("target_feature")
qf.trigger_condition = data.get("trigger_condition")
qf.observed_issue = data.get("observed_issue")
qf.desired_action = data.get("desired_action")
qf.slot_quotes = data.get("slot_quotes", {})
# Mapped symbols
for ms_data in data.get("mapped_symbols", []):
evidence = None
if ms_data.get("evidence"):
ev = ms_data["evidence"]
evidence = SlotEvidence(
tool=ev.get("tool", ""),
params=ev.get("params", {}),
result_summary=ev.get("result_summary", ""),
timestamp=ev.get("timestamp", ""),
)
qf.mapped_symbols.append(MappedSymbol(
name=ms_data.get("name", ""),
source=SlotSource(ms_data.get("source", "UNRESOLVED")),
confidence=ms_data.get("confidence", 0.0),
evidence=evidence,
))
# Slot source
for k, v in data.get("slot_source", {}).items():
qf.slot_source[k] = SlotSource(v)
# Slot evidence
for k, ev_data in data.get("slot_evidence", {}).items():
qf.slot_evidence[k] = SlotEvidence(
tool=ev_data.get("tool", ""),
params=ev_data.get("params", {}),
result_summary=ev_data.get("result_summary", ""),
timestamp=ev_data.get("timestamp", ""),
)
return qf
def _restore_exploration(data: dict) -> ExplorationResult:
"""Restore ExplorationResult from dict."""
return ExplorationResult(
symbols_identified=data.get("symbols_identified", []),
entry_points=data.get("entry_points", []),
existing_patterns=data.get("existing_patterns", []),
files_analyzed=data.get("files_analyzed", []),
tools_used=data.get("tools_used", []),
notes=data.get("notes", ""),
)
def _restore_semantic(data: dict) -> SemanticResult:
"""Restore SemanticResult from dict."""
hypotheses = []
for h in data.get("hypotheses", []):
hypotheses.append(Hypothesis(
text=h.get("text", ""),
confidence=h.get("confidence", "medium"),
))
semantic_reason = None
sr_val = data.get("semantic_reason")
if sr_val:
try:
semantic_reason = SemanticReason(sr_val)
except ValueError:
pass
return SemanticResult(
hypotheses=hypotheses,
semantic_reason=semantic_reason,
search_queries=data.get("search_queries", []),
)
def _restore_verification(data: dict) -> VerificationResult:
"""Restore VerificationResult from dict."""
verified = []
for vh_data in data.get("verified", []):
ev_data = vh_data.get("evidence", {})
evidence = VerificationEvidence(
tool=ev_data.get("tool", ""),
target=ev_data.get("target", ""),
result=ev_data.get("result", ""),
files=ev_data.get("files", []),
)
verified.append(VerifiedHypothesis(
hypothesis=vh_data.get("hypothesis", ""),
status=vh_data.get("status", "confirmed"),
evidence=evidence,
))
return VerificationResult(
verified=verified,
all_confirmed=data.get("all_confirmed", False),
)
def _restore_impact_analysis(data: dict) -> ImpactAnalysisResult:
"""Restore ImpactAnalysisResult from dict."""
verified_files = []
for vf_data in data.get("verified_files", []):
verified_files.append(VerifiedFile(
file=vf_data.get("file", ""),
status=vf_data.get("status", ""),
reason=vf_data.get("reason"),
))
return ImpactAnalysisResult(
target_files=data.get("target_files", []),
must_verify=data.get("must_verify", []),
should_verify=data.get("should_verify", []),
verified_files=verified_files,
inferred_from_rules=data.get("inferred_from_rules", []),
mode=data.get("mode", "standard"),
)
def _restore_pre_commit_review(data: dict) -> PreCommitReviewResult:
"""Restore PreCommitReviewResult from dict."""
reviewed_files = []
for rf_data in data.get("reviewed_files", []):
reviewed_files.append(ReviewedFile(
path=rf_data.get("path", ""),
decision=rf_data.get("decision", "keep"),
reason=rf_data.get("reason"),
change_type=rf_data.get("change_type", "modified"),
))
return PreCommitReviewResult(
total_changes=data.get("total_changes", 0),
reviewed_files=reviewed_files,
kept_files=data.get("kept_files", []),
discarded_files=data.get("discarded_files", []),
review_notes=data.get("review_notes", ""),
)
# =============================================================================
# Session Manager
# =============================================================================
class SessionManager:
"""
Manages multiple sessions.
In practice, there's usually one active session per conversation,
but this allows for session tracking and recovery.
"""
def __init__(self):
self._sessions: dict[str, SessionState] = {}
self._active_session_id: str | None = None
def create_session(
self,
intent: str,
query: str,
session_id: str | None = None,
repo_path: str = ".",
gate_level: str = "high",
) -> SessionState:
"""
Create a new session.
Args:
intent: IMPLEMENT, MODIFY, INVESTIGATE, QUESTION
query: User's original query
session_id: Optional session ID (auto-generated if not provided)
repo_path: agreements と learned_pairs の保存先を指定
gate_level: Gate level for exploration phases
- "high": Strict requirements (default)
- "middle": Standard requirements
- "low": Minimal requirements
- "auto": Server determines based on risk
- "none": Skip exploration phases, go directly to READY
"""
if session_id is None:
session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Determine initial phase based on intent and gate_level
if intent == "QUESTION":
# QUESTION intent always skips to READY
initial_phase = Phase.READY
elif gate_level == "none":
# --quick / -g=n: Skip exploration phases
initial_phase = Phase.READY
else:
# Normal flow: Start with EXPLORATION
initial_phase = Phase.EXPLORATION
session = SessionState(
session_id=session_id,
intent=intent,
query=query,
phase=initial_phase,
repo_path=repo_path,
)
# Store gate_level for later use (e.g., in evaluate_exploration)
session._gate_level = gate_level
self._sessions[session_id] = session
self._active_session_id = session_id
return session
def get_session(self, session_id: str) -> SessionState | None:
"""Get a session by ID."""
return self._sessions.get(session_id)
def get_active_session(self) -> SessionState | None:
"""Get the currently active session."""
if self._active_session_id:
return self._sessions.get(self._active_session_id)
return None
def set_active_session(self, session_id: str) -> bool:
"""Set the active session."""
if session_id in self._sessions:
self._active_session_id = session_id
return True
return False
def list_sessions(self) -> list[dict]:
"""List all sessions with basic info."""
return [
{
"session_id": s.session_id,
"intent": s.intent,
"phase": s.phase.name,
"created_at": s.created_at,
"active": s.session_id == self._active_session_id,
}
for s in self._sessions.values()
]
@staticmethod
def _infer_task_branch_state(session: SessionState, repo_path: str) -> None:
if session.task_branch_name:
return
try:
current_proc = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
cwd=repo_path,
capture_output=True,
text=True,
)
current = current_proc.stdout.strip() if current_proc.returncode == 0 else ""
if current.startswith("llm_task_"):
session.task_branch_enabled = True
session.task_branch_name = current
return
pattern = f"llm_task_{session.session_id}_from_*"
list_proc = subprocess.run(
["git", "branch", "--list", pattern],
cwd=repo_path,
capture_output=True,
text=True,
)
if list_proc.returncode == 0 and list_proc.stdout:
branches = [
b.strip().lstrip("* ")
for b in list_proc.stdout.strip().split("\n")
if b.strip()
]
if len(branches) == 1:
session.task_branch_enabled = True
session.task_branch_name = branches[0]
except Exception:
pass
# =========================================================================
# v1.12: Checkpoint Persistence
# =========================================================================
MAX_CHECKPOINT_BYTES = 256 * 1024
def save_checkpoint(self, session_id: str, base_path: str) -> bool:
"""Save session state to .code-intel/sessions/ (atomic write)."""
session = self._sessions.get(session_id)
if session is None:
return False
sessions_dir = Path(base_path) / ".code-intel" / "sessions"
tmp_path = None
try:
sessions_dir.mkdir(parents=True, exist_ok=True)
target_path = sessions_dir / f"{session_id}.json"
data = session.to_checkpoint_dict()
def _dump_size(d: dict) -> int:
return len(json.dumps(d, ensure_ascii=False, indent=2).encode("utf-8"))
def _compress_phase_payloads(payloads: dict, limit: int) -> dict:
compressed: dict[str, dict] = {}
for key, value in payloads.items():
summary = ""
if isinstance(value, dict):
summary = str(value.get("summary", ""))
if len(summary) > limit:
summary = summary[:limit].rstrip() + "..."
compressed[key] = {"summary": summary}
return compressed
if _dump_size(data) > self.MAX_CHECKPOINT_BYTES:
payloads = data.get("phase_payloads", {})
if isinstance(payloads, dict):
for limit in (400, 200, 100):
data["phase_payloads"] = _compress_phase_payloads(payloads, limit)
if _dump_size(data) <= self.MAX_CHECKPOINT_BYTES:
break
if _dump_size(data) > self.MAX_CHECKPOINT_BYTES:
items = list(data.get("phase_payloads", {}).items())
data["phase_payloads"] = dict(items[-10:])
if _dump_size(data) > self.MAX_CHECKPOINT_BYTES:
data["phase_payloads"] = {}
# Atomic write: tmp file → rename
fd, tmp_path = tempfile.mkstemp(
dir=str(sessions_dir), suffix=".tmp"
)
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.rename(tmp_path, str(target_path))
return True
except Exception:
# Clean up tmp file on failure
if tmp_path:
try:
os.unlink(tmp_path)
except Exception:
pass
return False
def load_checkpoint(self, session_id: str, base_path: str) -> SessionState | None:
"""Load session state from checkpoint file."""
cp_path = Path(base_path) / ".code-intel" / "sessions" / f"{session_id}.json"
if not cp_path.exists():
return None
try:
with open(cp_path, "r", encoding="utf-8") as f:
data = json.load(f)
session = SessionState.from_checkpoint(data)
# Register in memory
self._sessions[session.session_id] = session
self._active_session_id = session.session_id
self._infer_task_branch_state(session, session.repo_path or base_path)
return session
except Exception:
return None
def list_checkpoints(self, base_path: str) -> list[dict]:
"""List available checkpoints with metadata."""
sessions_dir = Path(base_path) / ".code-intel" / "sessions"
if not sessions_dir.exists():
return []
checkpoints = []
for cp_file in sessions_dir.glob("*.json"):
try:
with open(cp_file, "r", encoding="utf-8") as f:
data = json.load(f)
if "orchestrator_state" in data:
orchestrator_state = data.get("orchestrator_state", {})
phase_state = orchestrator_state.get("phase_state", {})
checkpoints.append({
"session_id": orchestrator_state.get("session_id", cp_file.stem),
"phase": phase_state.get("current_phase", "UNKNOWN"),
"step": phase_state.get("step", 0),
"intent": orchestrator_state.get("intent", ""),
"query": orchestrator_state.get("query", ""),
"checkpoint_at": data.get("checkpoint_at", ""),
})
continue
checkpoints.append({
"session_id": data.get("session_id", cp_file.stem),
"phase": data.get("current_phase", "UNKNOWN"),
"step": PHASE_STEP_MAP.get(data.get("current_phase", ""), 0),
"intent": data.get("intent", ""),
"query": data.get("query", ""),
"checkpoint_at": data.get("checkpoint_at", ""),
})
except Exception:
pass
checkpoints.sort(key=lambda cp: cp.get("checkpoint_at", ""))
return checkpoints
@staticmethod
def delete_checkpoint(session_id: str, base_path: str) -> bool:
"""Delete a single checkpoint file."""
cp_path = Path(base_path) / ".code-intel" / "sessions" / f"{session_id}.json"
if cp_path.exists():
try:
cp_path.unlink()
return True
except Exception:
return False
return False
@staticmethod
def delete_checkpoints(base_path: str) -> int:
"""Delete all checkpoint files (for --clean)."""
sessions_dir = Path(base_path) / ".code-intel" / "sessions"
if not sessions_dir.exists():
return 0
deleted = 0
for cp_file in sessions_dir.glob("*.json"):
try:
cp_file.unlink()
deleted += 1
except Exception:
pass
return deleted