#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "mcp[cli]",
# "chromadb",
# "httpx",
# "pydantic",
# "pydantic-settings",
# ]
# ///
"""Claude Code / Factory Stop hook for capturing session learnings.
This hook runs when the agent finishes responding. It can analyze the
conversation to identify important patterns, decisions, or issues that
should be remembered for future sessions.
Enhanced with intelligent continuation control to prevent stopping when work is incomplete.
Usage:
Configure in ~/.claude/settings.json (Claude Code) or ~/.factory/settings.json (Factory):
{
"hooks": {
"Stop": [
{
"hooks": [
{
"type": "command",
"command": "python /path/to/recall/hooks/recall-stop.py",
"timeout": 10
}
]
}
]
}
}
Environment Variables:
RECALL_STOP_CHECK_TODOS - Check for incomplete TODOs (default: true)
RECALL_STOP_CHECK_GIT - Check for uncommitted changes (default: false)
RECALL_STOP_CHECK_TESTS - Check if tests were run after code changes (default: false)
RECALL_STOP_ENFORCE_MEMORY - Enforce memory storage for significant sessions (default: true)
Input (via stdin JSON):
{
"session_id": "abc123",
"transcript_path": "/path/to/transcript.jsonl",
"cwd": "/project/root",
"permission_mode": "default",
"hook_event_name": "Stop",
"stop_hook_active": false
}
Output:
- JSON with decision: "block" to continue, or nothing to allow stop
- If blocking, must provide "reason" for the agent to continue
"""
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
# Import DaemonClient for fast IPC with recall-daemon
try:
sys.path.insert(0, str(Path(__file__).parent))
from recall_client import DaemonClient
except ImportError:
DaemonClient = None # type: ignore[misc, assignment]
# Import session state for specific suggestions
try:
from recall_session_state import get_session_summary, set_checkpoint
SESSION_STATE_AVAILABLE = True
except ImportError:
SESSION_STATE_AVAILABLE = False
def read_hook_input() -> dict:
"""Read hook input from stdin."""
try:
if sys.stdin.isatty():
return {}
stdin_data = sys.stdin.read()
if stdin_data:
return json.loads(stdin_data)
except (OSError, json.JSONDecodeError):
pass
return {}
def get_project_namespace() -> str:
"""Derive project namespace from current working directory."""
cwd = str(Path.cwd())
project_name = Path(cwd).name
project_indicators = [
".git",
"pyproject.toml",
"package.json",
"Cargo.toml",
"go.mod",
]
for indicator in project_indicators:
if Path(cwd, indicator).exists():
return f"project:{project_name}"
return "global"
def read_transcript_tail(transcript_path: str | None, lines: int = 50) -> str | None:
"""Read the last N lines of the transcript."""
if not transcript_path:
return None
try:
path = Path(transcript_path).expanduser()
if not path.exists():
return None
content = path.read_text()
# Get last N lines
all_lines = content.strip().split("\n")
return "\n".join(all_lines[-lines:])
except Exception:
return None
def read_full_transcript(transcript_path: str | None) -> str | None:
"""Read the full transcript for comprehensive analysis."""
if not transcript_path:
return None
try:
path = Path(transcript_path).expanduser()
if not path.exists():
return None
return path.read_text()
except Exception:
return None
def analyze_for_learnings(transcript_tail: str) -> list[dict[str, str]]:
"""Analyze transcript for potential learnings.
Returns list of dicts with type and content for memories to store.
This is a lightweight heuristic analysis - heavy analysis is done in SessionEnd.
"""
learnings: list[dict[str, str]] = []
# Look for error patterns that were resolved
if "error" in transcript_tail.lower() and (
"fixed" in transcript_tail.lower() or "resolved" in transcript_tail.lower()
):
# There was an error that got fixed - potential pattern to remember
pass # Let SessionEnd capture the full context
# Look for explicit memory storage requests
if "remember" in transcript_tail.lower() or "note that" in transcript_tail.lower():
# User asked to remember something - SessionEnd will capture
pass
return learnings
def check_memory_stored(transcript: str) -> bool:
"""Check if any memory was stored during this session.
Parses JSONL transcript to detect memory storage via:
1. mcp__mcp-exec__execute_code_with_wrappers with wrappers containing "recall"
AND code containing "memory_store"
2. Direct mcp__recall__memory_store tool calls
Returns:
True if ANY memory store call was made, False otherwise.
MUST return False on parse errors to avoid false positives.
"""
try:
lines = transcript.strip().split("\n")
for line in lines:
try:
entry = json.loads(line)
# Only check tool_use entries
if entry.get("type") != "tool_use":
continue
tool_name = entry.get("tool_name") or entry.get("name", "")
params = entry.get("params") or entry.get("input", {})
# Check 1: mcp-exec wrapper with recall
if tool_name == "mcp__mcp-exec__execute_code_with_wrappers":
wrappers = params.get("wrappers", [])
code = params.get("code", "")
# Wrappers MUST contain "recall" AND code MUST contain memory_store
if isinstance(wrappers, list) and "recall" in wrappers:
if "memory_store" in code:
return True
# Check 2: Direct memory_store tool call
if tool_name in [
"mcp__recall__memory_store",
"mcp__recall__memory_store_tool",
"memory_store",
"memory_store_tool",
]:
return True
except json.JSONDecodeError:
continue
return False
except Exception:
# MUST return False on error to avoid blocking falsely
return False
def session_had_significant_work(transcript: str) -> bool:
"""Check if the session had meaningful work worth remembering.
Parses JSONL transcript and counts significant tool calls:
- Significant tools: Edit, Write, MultiEdit, Bash (with code-modifying commands)
- Excluded tools: Read, Glob, Grep (information gathering only)
Returns:
True if 2+ significant tool calls were made.
MUST return False on parse errors to avoid false positives.
"""
# Bash commands that indicate code modification vs information gathering
code_modifying_bash_patterns = [
"git commit",
"git add",
"npm install",
"pip install",
"uv add",
"yarn add",
"make",
"cargo build",
"go build",
"docker",
"mkdir",
"touch",
"rm ",
"mv ",
"cp ",
"chmod",
"chown",
]
# Read-only bash commands to exclude
read_only_bash_patterns = [
"git status",
"git log",
"git diff",
"ls",
"pwd",
"cat ",
"head ",
"tail ",
"which",
"echo",
"python -c",
]
try:
lines = transcript.strip().split("\n")
significant_count = 0
for line in lines:
try:
entry = json.loads(line)
# Only check tool_use entries
if entry.get("type") != "tool_use":
continue
tool_name = entry.get("tool_name") or entry.get("name", "")
params = entry.get("params") or entry.get("input", {})
# Edit, Write, MultiEdit are always significant
if tool_name in ["Edit", "Write", "MultiEdit"]:
significant_count += 1
continue
# Bash needs command inspection
if tool_name == "Bash":
command = params.get("command", "")
# Skip if clearly read-only
if any(
pattern in command for pattern in read_only_bash_patterns
):
continue
# Count if code-modifying
if any(
pattern in command for pattern in code_modifying_bash_patterns
):
significant_count += 1
# Early exit once threshold is met
if significant_count >= 2:
return True
except json.JSONDecodeError:
continue
return significant_count >= 2
except Exception:
# MUST return False on error to avoid false positives
return False
def find_incomplete_todos(transcript: str) -> list[str]:
"""Parse transcript for TodoWrite tool calls and find incomplete items.
Returns list of incomplete TODO descriptions.
"""
incomplete_todos = []
try:
# Parse JSONL transcript
lines = transcript.strip().split("\n")
# Track all TODOs and their statuses
todo_tracker = {} # {todo_text: status}
for line in lines:
try:
entry = json.loads(line)
# Look for TodoWrite tool calls
if entry.get("type") == "tool_use":
tool_name = entry.get("tool_name") or entry.get("name", "")
if tool_name == "TodoWrite":
params = entry.get("params") or entry.get("input", {})
todo_text = params.get("todo", "")
status = params.get("status", "pending")
if todo_text:
# Track the latest status for each TODO
todo_tracker[todo_text] = status
except json.JSONDecodeError:
continue
# Find TODOs that are not completed
for todo_text, status in todo_tracker.items():
if status != "completed":
incomplete_todos.append(f"{todo_text} (status: {status})")
except Exception:
pass
return incomplete_todos
def check_git_status(cwd: str) -> str | None:
"""Check for uncommitted changes using git status.
Returns a summary string if there are changes, None otherwise.
"""
try:
result = subprocess.run(
["git", "status", "--porcelain"],
check=False,
cwd=cwd,
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
# Parse git status output
lines = result.stdout.strip().split("\n")
modified = []
added = []
deleted = []
untracked = []
for line in lines:
if len(line) < 3:
continue
status = line[:2]
filename = line[3:]
if status.startswith("M") or status.endswith("M"):
modified.append(filename)
elif status.startswith("A"):
added.append(filename)
elif status.startswith("D"):
deleted.append(filename)
elif status.startswith("??"):
untracked.append(filename)
parts = []
if modified:
parts.append(f"{len(modified)} modified")
if added:
parts.append(f"{len(added)} added")
if deleted:
parts.append(f"{len(deleted)} deleted")
if untracked:
parts.append(f"{len(untracked)} untracked")
return ", ".join(parts) if parts else None
return None
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
return None
def should_have_committed(transcript: str) -> bool:
"""Check if user requested a commit based on transcript.
Returns True if user asked to commit changes.
"""
# Look for explicit commit requests in user messages
commit_keywords = [
"commit",
"create a commit",
"git commit",
"save these changes",
"add and commit",
]
try:
lines = transcript.strip().split("\n")
for line in lines:
try:
entry = json.loads(line)
# Check user messages
if entry.get("role") == "user":
content = entry.get("content", "")
if isinstance(content, str):
content_lower = content.lower()
if any(keyword in content_lower for keyword in commit_keywords):
return True
elif isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "").lower()
if any(keyword in text for keyword in commit_keywords):
return True
except json.JSONDecodeError:
continue
except Exception:
pass
return False
def code_modified_without_tests(transcript: str) -> bool:
"""Check if code was modified but no tests were run.
Returns True if code files were written/edited but no test commands were executed.
"""
try:
lines = transcript.strip().split("\n")
code_modified = False
tests_run = False
code_extensions = {
".py",
".ts",
".js",
".jsx",
".tsx",
".go",
".rs",
".java",
".cpp",
".c",
}
test_commands = [
"pytest",
"npm test",
"jest",
"go test",
"cargo test",
"python -m pytest",
"python -m unittest",
]
for line in lines:
try:
entry = json.loads(line)
# Check for Write/Edit tool calls on code files
if entry.get("type") == "tool_use":
tool_name = entry.get("tool_name") or entry.get("name", "")
if tool_name in ["Write", "Edit"]:
params = entry.get("params") or entry.get("input", {})
file_path = params.get("file_path", "")
# Check if it's a code file (not a test file)
if file_path and not any(
test_marker in file_path
for test_marker in ["test_", "_test.", "tests/", "test/"]
):
if any(file_path.endswith(ext) for ext in code_extensions):
code_modified = True
# Check for Bash tool calls with test commands
elif tool_name == "Bash":
params = entry.get("params") or entry.get("input", {})
command = params.get("command", "")
if any(test_cmd in command for test_cmd in test_commands):
tests_run = True
except json.JSONDecodeError:
continue
return code_modified and not tests_run
except Exception:
return False
def get_specific_suggestions(session_id: str) -> list[str]:
"""Get specific memory suggestions from session state.
Looks at what was captured during the session and suggests
specific things to store, rather than generic prompts.
Args:
session_id: Current session ID
Returns:
List of specific suggestion strings
"""
if not SESSION_STATE_AVAILABLE:
return []
suggestions = []
try:
summary = get_session_summary(session_id)
# Suggest storing unresolved errors as patterns
for error in summary.get("unresolved_errors", [])[:3]:
error_msg = error.get("error", "")[:60]
tool = error.get("tool", "unknown")
suggestions.append(f"• Error pattern: '{error_msg}...' from {tool}")
# Suggest storing detected but unstored preferences
for pref in summary.get("unstored_preferences", [])[:3]:
content = pref.get("content", "")[:60]
# Filter out internal tracking markers
if content.startswith("__"):
continue
suggestions.append(f"• Preference: {content}")
# Suggest storing fix patterns that weren't auto-stored
for pair in summary.get("unstored_patterns", [])[:3]:
pattern = pair.get("pattern", "")[:60]
suggestions.append(f"• Fix pattern: {pattern}")
except Exception:
pass
return suggestions
def check_completion(transcript: str, cwd: str, session_id: str = "") -> dict | None:
"""Check if work is complete. Returns block decision or None.
Performs multiple checks to determine if the agent should continue working:
- Check 1: Incomplete TODOs (RECALL_STOP_CHECK_TODOS, default: true)
- Check 2: Uncommitted changes when user requested commit (RECALL_STOP_CHECK_GIT, default: false)
- Check 3: Code modified without running tests (RECALL_STOP_CHECK_TESTS, default: false)
- Check 4: Memory storage enforcement for significant sessions (RECALL_STOP_ENFORCE_MEMORY, default: true)
Returns:
dict with "decision": "block" and "reason" if work is incomplete.
None if stopping is allowed.
"""
# Get configuration from environment
check_todos = os.getenv("RECALL_STOP_CHECK_TODOS", "true").lower() == "true"
check_git = os.getenv("RECALL_STOP_CHECK_GIT", "false").lower() == "true"
check_tests = os.getenv("RECALL_STOP_CHECK_TESTS", "false").lower() == "true"
enforce_memory = os.getenv("RECALL_STOP_ENFORCE_MEMORY", "true").lower() == "true"
# Check 1: Incomplete TODOs
if check_todos:
incomplete_todos = find_incomplete_todos(transcript)
if incomplete_todos:
reason_lines = ["Incomplete tasks remain:"]
reason_lines.extend(
f"- {todo}" for todo in incomplete_todos[:5]
) # Limit to 5 items
if len(incomplete_todos) > 5:
reason_lines.append(f"... and {len(incomplete_todos) - 5} more")
reason_lines.append("\nComplete these tasks before stopping.")
return {
"decision": "block",
"reason": "\n".join(reason_lines),
}
# Check 2: Uncommitted changes (skip for hooks directory - golden rule)
if check_git and "/.claude/hooks" not in cwd:
uncommitted = check_git_status(cwd)
if uncommitted and should_have_committed(transcript):
return {
"decision": "block",
"reason": f"Uncommitted changes: {uncommitted}.\n\nYou indicated you wanted to commit these changes. Please commit before stopping.",
}
# Check 3: Tests not run after code modifications
if check_tests:
if code_modified_without_tests(transcript):
return {
"decision": "block",
"reason": "Code was modified but tests were not run.\n\nRun tests to verify the changes before stopping.",
}
# Check 4: Memory storage enforcement (ALWAYS - no significant work check)
if enforce_memory:
if not check_memory_stored(transcript):
# Get specific suggestions from session state
suggestions = get_specific_suggestions(session_id) if session_id else []
if suggestions:
# Show specific suggestions from session activity
suggestions_text = "\n".join(suggestions)
return {
"decision": "block",
"reason": f"""🧠 MEMORY STORAGE REQUIRED
You MUST store at least one memory before stopping.
**Specific learnings from this session to consider:**
{suggestions_text}
## Quick Store Command
```javascript
await recall.memory_store_tool({{
content: "...", // Use a suggestion above or describe what you learned
memory_type: "pattern", // or preference, decision
importance: 0.6,
namespace: "project:name"
}});
```
Use mcp__mcp-exec__execute_code_with_wrappers with wrappers: ["recall"]""",
}
else:
# No specific suggestions - use generic prompt
return {
"decision": "block",
"reason": """🧠 MEMORY STORAGE REQUIRED
You MUST store at least one memory before stopping.
## Available Memory Tools (via recall MCP wrapper)
| Tool | Purpose |
|------|---------|
| `memory_store_tool` | Store with semantic indexing + auto-deduplication |
| `memory_recall_tool` | Semantic search with graph expansion |
| `memory_validate_tool` | Adjust confidence based on success/failure |
| `memory_apply_tool` | Record memory usage (TRY phase) |
| `memory_outcome_tool` | Record result (LEARN phase) |
| `memory_relate_tool` | Create relationships (supersedes, contradicts) |
## Store Session Learning
```javascript
await recall.memory_store_tool({
content: "What happened: task requested, actions taken, outcome",
memory_type: "session", // or pattern, decision, preference
importance: 0.5,
namespace: "project:name" // or "global"
});
```
## TRY-LEARN Cycle (if applying existing memory)
```javascript
// 1. Before applying advice
await recall.memory_apply_tool({ memory_id: "...", context: "Applying to X" });
// 2. After seeing result
await recall.memory_outcome_tool({ memory_id: "...", success: true, outcome: "Worked" });
```
Use mcp__mcp-exec__execute_code_with_wrappers with wrappers: ["recall"]""",
}
# Mark checkpoint that stop was reviewed
if session_id and SESSION_STATE_AVAILABLE:
try:
set_checkpoint(session_id, "stop_reviewed", True)
except Exception:
pass
return None # Allow stopping
def call_recall(tool_name: str, args: dict) -> dict:
"""Call recall MCP tool directly via --call mode.
Note:
Uses process groups (start_new_session=True) to ensure all child
processes are killed on timeout, preventing zombie processes.
"""
import os
import signal
proc = None
try:
recall_paths = [
Path(__file__).parent.parent,
Path.home() / "Documents" / "Github" / "recall",
Path.home() / ".local" / "share" / "recall",
Path("/opt/recall"),
]
recall_dir = None
for path in recall_paths:
if (path / "src" / "recall" / "__main__.py").exists():
recall_dir = path
break
if recall_dir is None:
cmd = [
"uv",
"run",
"python",
"-m",
"recall",
"--call",
tool_name,
"--args",
json.dumps(args),
]
else:
cmd = [
"uv",
"run",
"--directory",
str(recall_dir),
"python",
"-m",
"recall",
"--call",
tool_name,
"--args",
json.dumps(args),
]
# Use Popen with start_new_session=True to create a process group
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=recall_dir or Path.cwd(),
start_new_session=True,
)
try:
stdout, stderr = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
# Kill the entire process group to prevent zombie children
if proc.pid:
try:
os.killpg(proc.pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
proc.wait()
return {"success": False, "error": "recall timed out"}
if proc.returncode != 0:
return {"success": False, "error": f"recall failed: {stderr}"}
parsed = json.loads(stdout)
if parsed is None:
return {"success": False, "error": "recall returned null"}
return parsed
except json.JSONDecodeError as e:
return {"success": False, "error": f"Invalid JSON response: {e}"}
except FileNotFoundError:
return {"success": False, "error": "uv or python not found"}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
if proc is not None and proc.poll() is None:
try:
os.killpg(proc.pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError, OSError):
pass
proc.wait()
def record_stop_event(session_id: str, namespace: str, blocked: bool = False) -> None:
"""Record that the agent stopped (for analytics/tracking)."""
try:
# This could be used to track session patterns
# For now, we just log it
log_path = Path.home() / ".claude" / "hooks" / "logs" / "recall-stop.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
status = "BLOCKED" if blocked else "STOP"
with log_path.open("a") as f:
f.write(
f"{datetime.now(tz=timezone.utc).isoformat()} | {status} | session={session_id} | namespace={namespace}\n",
)
except Exception:
pass
def main():
"""Main hook entry point.
The Stop hook is primarily for:
1. Recording that the agent stopped (analytics)
2. Optionally blocking the stop if work is incomplete
Heavy analysis is deferred to SessionEnd hook.
"""
try:
hook_input = read_hook_input()
session_id = hook_input.get("session_id") or hook_input.get(
"sessionId",
"unknown",
)
transcript_path = hook_input.get("transcript_path") or hook_input.get(
"transcriptPath",
)
cwd = hook_input.get("cwd", str(Path.cwd()))
stop_hook_active = hook_input.get("stop_hook_active", False)
# Prevent infinite loops - if stop hook already ran, don't block again
if stop_hook_active:
return
# Change to session's working directory
if cwd:
os.chdir(cwd)
namespace = get_project_namespace()
# Read full transcript for comprehensive checks
full_transcript = read_full_transcript(transcript_path)
# Check if work is complete before allowing stop
if full_transcript:
completion_check = check_completion(full_transcript, cwd, session_id)
if completion_check and completion_check.get("decision") == "block":
# Block the stop and provide reason to continue
record_stop_event(session_id, namespace, blocked=True)
# Output JSON to block the stop
output = {
"decision": "block",
"reason": completion_check["reason"],
}
print(json.dumps(output))
sys.stdout.flush()
return
# Record the stop event
record_stop_event(session_id, namespace, blocked=False)
# Read recent transcript for learning analysis
transcript_tail = read_transcript_tail(transcript_path, lines=30)
if transcript_tail:
# Light analysis for immediate learnings
learnings = analyze_for_learnings(transcript_tail)
# Store any immediate learnings using DaemonClient (with auto-fallback)
if learnings:
if DaemonClient is not None:
with DaemonClient() as client:
for learning in learnings:
client.store(
content=learning["content"],
namespace=namespace,
memory_type=learning.get("type", "pattern"),
importance=0.6,
metadata={
"source": "recall-stop",
"session_id": session_id,
},
)
else:
# Fallback to subprocess if DaemonClient unavailable
for learning in learnings:
call_recall(
"memory_store",
{
"content": learning["content"],
"memory_type": learning.get("type", "pattern"),
"namespace": namespace,
"importance": 0.6,
"metadata": {
"source": "recall-stop",
"session_id": session_id,
},
},
)
# Output memory reflection prompt for Claude
reflection_prompt = """🛑 STOP - Memory Operations REQUIRED
You MUST evaluate learnings from this interaction before stopping:
- Patterns discovered or reinforced
- User preferences revealed (explicit or implicit)
- Technical decisions made and their rationale
- Bugs fixed that could recur
## Available Memory Tools (via recall MCP wrapper)
| Tool | Purpose |
|------|---------|
| `memory_store` | Store with semantic indexing + auto-deduplication |
| `memory_recall` | Semantic search with graph expansion |
| `memory_validate` | Adjust confidence based on success/failure |
| `memory_apply` | Record memory usage (TRY phase) |
| `memory_outcome` | Record result (LEARN phase) |
| `memory_relate` | Create relationships (supersedes, contradicts, etc.) |
| `memory_forget` | Delete with golden rule protection |
## Before Stopping
**Store new learnings:**
```javascript
await recall.memory_store_tool({
content: "...",
memory_type: "pattern", // preference, decision, golden_rule
importance: 0.7,
namespace: "project:name"
});
```
**If a memory was applied and worked:**
```javascript
await recall.memory_outcome({
memory_id: "...",
success: true,
outcome: "Applied successfully in this session"
});
```
**If new memory supersedes old one:**
```javascript
await recall.memory_relate({
source_id: "new_id",
target_id: "old_id",
relation_type: "supersedes"
});
```
Use mcp__mcp-exec__execute_code_with_wrappers with wrappers: ["recall"]"""
output = {"systemMessage": reflection_prompt}
print(json.dumps(output))
sys.stdout.flush()
except BrokenPipeError:
pass
except Exception as e:
# Log error but don't block
try:
log_path = Path.home() / ".claude" / "hooks" / "logs" / "recall-stop.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
with log_path.open("a") as f:
f.write(f"{datetime.now(tz=timezone.utc).isoformat()} | ERROR | {e}\n")
except Exception:
pass
if __name__ == "__main__":
main()