#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = []
# ///
"""Claude Code / Factory PostToolUse hook for tracking tool operations.
This hook runs after tool calls complete and records activity in recall.
Tracks file operations, searches, web fetches, and subagent tasks.
ENHANCED: Now provides feedback to Claude via additionalContext output.
MIGRATED: Uses DaemonClient for IPC with automatic subprocess fallback.
Supported tools:
Task, Bash, Glob, Grep, Read, Write, Edit, MultiEdit,
WebFetch, WebSearch, mcp__*
Usage:
Configure in ~/.claude/settings.json (Claude Code) or
~/.factory/settings.json (Factory)::
{
"hooks": {
"PostToolUse": [
{
"matcher": "Task|Bash|Glob|Grep|Read|Write|Edit|...",
"hooks": [
{
"type": "command",
"command": "python /path/to/recall-track.py",
"timeout": 5
}
]
}
]
}
}
Input (via stdin JSON):
{
"tool_name": "Write",
"tool_input": {"file_path": "/path/to/file.py", "content": "..."},
"tool_response": {"success": true, "filePath": "/path/to/file.py"},
"session_id": "abc123",
"cwd": "/project/root"
}
Output (to stdout JSON):
{
"hookSpecificOutput": {
"hookEventName": "PostToolUse",
"additionalContext": "Note: file.py modified 4 times..."
}
}
The hook extracts relevant info and stores it in recall.
Failures are handled gracefully to avoid blocking the agent.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any
# Import DaemonClient for IPC communication
from recall_client import DaemonClient
# =============================================================================
# Constants
# =============================================================================
SESSION_ACTIVITY_FILE = Path("/tmp/claude-session-activity.json")
PROJECT_INDICATORS = (
".git",
"pyproject.toml",
"package.json",
"Cargo.toml",
"go.mod",
".project",
"pom.xml",
"build.gradle",
)
FILE_TOOLS = frozenset({"Read", "Write", "Edit", "MultiEdit"})
SEARCH_TOOLS = frozenset({"Glob", "Grep"})
WEB_TOOLS = frozenset({"WebFetch", "WebSearch"})
MODIFICATION_ACTIONS = frozenset({"write", "edit", "multiedit"})
TEST_MARKERS = frozenset({"test_", "_test.", ".test.", ".spec."})
MOD_COUNT_THRESHOLD = 3
PATTERN_TRUNCATE_LENGTH = 50
URL_TRUNCATE_LENGTH = 100
COMMAND_TRUNCATE_LENGTH = 100
DESC_TRUNCATE_LENGTH = 50
# =============================================================================
# Data Classes
# =============================================================================
@dataclass(frozen=True, slots=True)
class HookInput:
"""Parsed hook input from Claude Code.
Attributes:
tool_name: Name of the tool that was invoked.
tool_input: Input parameters passed to the tool.
tool_response: Response returned by the tool.
session_id: Current session identifier.
cwd: Current working directory.
"""
tool_name: str
tool_input: dict[str, Any]
tool_response: dict[str, Any]
session_id: str | None
cwd: str
@classmethod
def from_dict(cls, data: dict[str, Any]) -> HookInput:
"""Create HookInput from raw dictionary.
Args:
data: Raw hook input dictionary with camelCase or snake_case keys.
Returns:
Parsed HookInput instance.
"""
import os
return cls(
tool_name=data.get("tool_name") or data.get("toolName", ""),
tool_input=data.get("tool_input") or data.get("toolInput", {}),
tool_response=data.get("tool_response") or data.get("toolResponse", {}),
session_id=data.get("session_id") or data.get("sessionId"),
cwd=data.get("cwd", os.getcwd()),
)
# =============================================================================
# Input/Output Functions
# =============================================================================
def run_background(data_file: str) -> None:
"""Background worker that performs actual tracking work.
Reads hook input from a temporary file, deletes it, then processes
the tracking asynchronously.
Args:
data_file: Path to temporary JSON file containing hook input.
"""
import json
import os
try:
with open(data_file, encoding="utf-8") as f:
hook_input = json.load(f)
os.unlink(data_file)
_do_tracking(hook_input)
except Exception:
pass
def read_hook_input() -> dict[str, Any]:
"""Read hook input from stdin.
Claude Code passes hook data as JSON via stdin.
Returns:
Dictionary with hook input data, or empty dict if unavailable.
"""
import json
if sys.stdin.isatty():
return {}
try:
stdin_data = sys.stdin.read()
if stdin_data:
return json.loads(stdin_data)
except (OSError, json.JSONDecodeError):
pass
return {}
# =============================================================================
# File Path Utilities
# =============================================================================
def get_file_type(file_path: str) -> str | None:
"""Get file type from extension.
Args:
file_path: Path to the file.
Returns:
File extension (e.g., '.py', '.ts') or None if no extension.
"""
ext = Path(file_path).suffix.lower()
return ext if ext else None
def extract_file_path(tool_name: str, tool_input: dict[str, Any]) -> str | None:
"""Extract file path from tool input.
Args:
tool_name: Name of the tool (Read, Write, Edit, MultiEdit).
tool_input: Tool input dictionary.
Returns:
File path or None if not found.
"""
return tool_input.get("file_path")
def get_action(tool_name: str) -> str:
"""Convert tool name to action string.
Args:
tool_name: Name of the tool.
Returns:
Action string (read, write, edit, multiedit).
"""
return tool_name.lower()
def find_project_root(file_path: str) -> str | None:
"""Find project root from file path.
Walks up directory tree looking for project indicators like .git,
pyproject.toml, package.json, etc.
Args:
file_path: Path to the file.
Returns:
Project root path or None if not found.
"""
try:
current = Path(file_path).resolve().parent
while current != current.parent:
for indicator in PROJECT_INDICATORS:
if (current / indicator).exists():
return str(current)
current = current.parent
except Exception:
pass
return None
# =============================================================================
# Daemon Client Integration
# =============================================================================
def _get_daemon_client() -> DaemonClient:
"""Get a configured DaemonClient instance.
Returns:
DaemonClient with auto_fallback enabled for subprocess fallback.
"""
return DaemonClient(
connect_timeout=2.0,
request_timeout=4.0,
auto_fallback=True,
)
# =============================================================================
# Session Activity Tracking
# =============================================================================
def load_session_activity() -> dict[str, int]:
"""Load session activity from temp file.
Returns:
Dictionary mapping file paths to modification counts.
"""
import json
if not SESSION_ACTIVITY_FILE.exists():
return {}
try:
with open(SESSION_ACTIVITY_FILE, encoding="utf-8") as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return {}
def save_session_activity(activity: dict[str, int]) -> None:
"""Save session activity to temp file.
Args:
activity: Dictionary mapping file paths to modification counts.
"""
import json
try:
SESSION_ACTIVITY_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(SESSION_ACTIVITY_FILE, "w", encoding="utf-8") as f:
json.dump(activity, f)
except Exception:
pass
def update_session_activity(file_path: str, action: str) -> int:
"""Update session activity for a file.
Args:
file_path: Path to the file that was modified.
action: Action performed (write, edit, multiedit).
Returns:
Total number of modifications to this file in this session.
"""
if action not in MODIFICATION_ACTIONS:
return 0
activity = load_session_activity()
activity[file_path] = activity.get(file_path, 0) + 1
save_session_activity(activity)
return activity[file_path]
# =============================================================================
# Test File Discovery
# =============================================================================
def _get_python_test_patterns(path: Path) -> list[Path]:
"""Get test file patterns for Python files.
Args:
path: Source file path.
Returns:
List of potential test file paths.
"""
patterns = [
path.parent / f"test_{path.name}",
path.parent / path.name.replace(".py", "_test.py"),
]
path_str = str(path)
for src_dir in ("/src/", "/lib/"):
if src_dir in path_str:
tests_path = Path(path_str.replace(src_dir, "/tests/"))
patterns.extend([
tests_path.parent / f"test_{tests_path.name}",
tests_path.parent / tests_path.name.replace(".py", "_test.py"),
])
return patterns
def _get_js_ts_test_patterns(path: Path) -> list[Path]:
"""Get test file patterns for JavaScript/TypeScript files.
Args:
path: Source file path.
Returns:
List of potential test file paths.
"""
suffix = path.suffix
patterns = [
path.parent / path.name.replace(suffix, f".test{suffix}"),
path.parent / path.name.replace(suffix, f".spec{suffix}"),
]
path_str = str(path)
if "/src/" in path_str:
for test_dir in ("/tests/", "/__tests__/"):
tests_path = Path(path_str.replace("/src/", test_dir))
patterns.extend([
tests_path.parent / tests_path.name.replace(suffix, f".test{suffix}"),
tests_path.parent / tests_path.name.replace(suffix, f".spec{suffix}"),
])
return patterns
def find_test_file(source_file: str) -> str | None:
"""Find corresponding test file for a source file.
Checks common test patterns:
- /src/ -> /tests/ or /test/
- file.py -> test_file.py or file_test.py
- file.ts -> file.test.ts or file.spec.ts
Args:
source_file: Path to the source file.
Returns:
Path to test file if it exists, None otherwise.
"""
path = Path(source_file)
if any(marker in path.name for marker in TEST_MARKERS):
return None
test_patterns: list[Path] = []
if path.suffix == ".py":
test_patterns.extend(_get_python_test_patterns(path))
elif path.suffix in {".ts", ".js", ".tsx", ".jsx"}:
test_patterns.extend(_get_js_ts_test_patterns(path))
for test_path in test_patterns:
if test_path.exists():
return str(test_path)
return None
# =============================================================================
# Lint/Test Command Suggestions
# =============================================================================
def get_lint_command(file_path: str, file_type: str | None) -> str | None:
"""Get suggested lint/test command for a file.
Args:
file_path: Path to the file.
file_type: File extension (e.g., '.py', '.ts').
Returns:
Suggested command or None.
"""
commands = {
".py": f"pytest {file_path}",
".ts": "npm run lint",
".js": "npm run lint",
".tsx": "npm run lint",
".jsx": "npm run lint",
".rs": "cargo test",
".go": "go test",
}
return commands.get(file_type)
# =============================================================================
# Feedback Generation
# =============================================================================
def generate_feedback(
tool_name: str,
tool_input: dict[str, Any],
tool_response: dict[str, Any],
session_activity: dict[str, int],
) -> str | None:
"""Generate feedback context for Claude.
Args:
tool_name: Name of the tool used.
tool_input: Tool input parameters.
tool_response: Tool response.
session_activity: Current session activity counts.
Returns:
Feedback string or None if no feedback needed.
"""
file_path = extract_file_path(tool_name, tool_input)
if not file_path:
return None
action = get_action(tool_name)
if action not in MODIFICATION_ACTIONS:
return None
file_type = get_file_type(file_path)
feedback_items: list[str] = []
mod_count = session_activity.get(file_path, 0)
if mod_count > MOD_COUNT_THRESHOLD:
feedback_items.append(
f"Note: {Path(file_path).name} modified {mod_count} times "
"this session. Consider consolidating changes.",
)
test_path = find_test_file(file_path)
if test_path:
feedback_items.append(f"Reminder: Test file at {test_path} may need updates.")
lint_cmd = get_lint_command(file_path, file_type)
if lint_cmd:
feedback_items.append(f"Consider running: {lint_cmd}")
return "\n".join(feedback_items) if feedback_items else None
# =============================================================================
# Activity Tracking Functions
# =============================================================================
def _write_log(log_path: Path, message: str) -> None:
"""Write a timestamped message to the log file.
Args:
log_path: Path to the log file.
message: Message to log.
"""
from datetime import datetime
timestamp = datetime.now().isoformat()
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"{timestamp} | {message}\n")
def track_file_activity(
tool_name: str,
tool_input: dict[str, Any],
session_id: str | None,
cwd: str,
log_path: Path,
) -> None:
"""Track file-related tool activity using DaemonClient.
Args:
tool_name: Name of the file tool.
tool_input: Tool input parameters.
session_id: Current session ID.
cwd: Current working directory.
log_path: Path to the log file.
"""
file_path = extract_file_path(tool_name, tool_input)
if not file_path:
return
action = get_action(tool_name)
file_type = get_file_type(file_path)
project_root = find_project_root(file_path) or cwd
project_name = Path(project_root).name
_write_log(log_path, f"{action} | {file_path} | project={project_root}")
# Use DaemonClient for IPC with automatic subprocess fallback
with _get_daemon_client() as client:
# Store file activity as a memory observation
content = f"File {action}: {file_path}"
if file_type:
content += f" ({file_type})"
result = client.store(
content=content,
namespace=f"project:{project_name}",
memory_type="observation",
importance=0.3, # Low importance for routine file operations
metadata={
"source": "recall-track",
"action": action,
"file_path": file_path,
"file_type": file_type,
"session_id": session_id,
"project_root": project_root,
},
)
if not result.get("success"):
_write_log(log_path, f"WARN: {result.get('error', 'unknown error')}")
def track_search_activity(
tool_name: str,
tool_input: dict[str, Any],
session_id: str | None,
cwd: str,
log_path: Path,
) -> None:
"""Track search tool activity (Glob, Grep).
Args:
tool_name: Name of the search tool.
tool_input: Tool input parameters.
session_id: Current session ID.
cwd: Current working directory.
log_path: Path to the log file.
"""
pattern = tool_input.get("pattern", "") or str(tool_input.get("patterns", []))
path = tool_input.get("path", cwd)
truncated_pattern = pattern[:PATTERN_TRUNCATE_LENGTH]
_write_log(log_path, f"{tool_name.lower()} | pattern={truncated_pattern} | path={path}")
def track_web_activity(
tool_name: str,
tool_input: dict[str, Any],
tool_response: dict[str, Any],
session_id: str | None,
log_path: Path,
) -> None:
"""Track web tool activity (WebFetch, WebSearch).
Args:
tool_name: Name of the web tool.
tool_input: Tool input parameters.
tool_response: Tool response.
session_id: Current session ID.
log_path: Path to the log file.
"""
if tool_name == "WebFetch":
url = tool_input.get("url", "")[:URL_TRUNCATE_LENGTH]
_write_log(log_path, f"webfetch | url={url}")
elif tool_name == "WebSearch":
query = tool_input.get("query", "")[:URL_TRUNCATE_LENGTH]
_write_log(log_path, f"websearch | query={query}")
def track_task_activity(
tool_input: dict[str, Any],
tool_response: dict[str, Any],
session_id: str | None,
log_path: Path,
) -> None:
"""Track Task (subagent) activity.
Args:
tool_input: Tool input parameters.
tool_response: Tool response.
session_id: Current session ID.
log_path: Path to the log file.
"""
description = tool_input.get("description", "")[:DESC_TRUNCATE_LENGTH]
subagent_type = tool_input.get("subagent_type", "")
_write_log(log_path, f"task | type={subagent_type} | desc={description}")
def track_bash_activity(
tool_input: dict[str, Any],
tool_response: dict[str, Any],
session_id: str | None,
log_path: Path,
) -> None:
"""Track Bash command activity.
Args:
tool_input: Tool input parameters.
tool_response: Tool response.
session_id: Current session ID.
log_path: Path to the log file.
"""
command = tool_input.get("command", "")[:COMMAND_TRUNCATE_LENGTH]
_write_log(log_path, f"bash | cmd={command}")
def track_mcp_activity(
tool_name: str,
tool_input: dict[str, Any],
tool_response: dict[str, Any],
session_id: str | None,
log_path: Path,
) -> None:
"""Track MCP tool activity.
Args:
tool_name: Full MCP tool name (mcp__server__tool format).
tool_input: Tool input parameters.
tool_response: Tool response.
session_id: Current session ID.
log_path: Path to the log file.
"""
parts = tool_name.split("__")
server = parts[1] if len(parts) >= 2 else "unknown"
mcp_tool = parts[2] if len(parts) >= 3 else "unknown"
_write_log(log_path, f"mcp | server={server} | tool={mcp_tool}")
# =============================================================================
# Core Tracking Logic
# =============================================================================
def _do_tracking(hook_input: dict[str, Any]) -> None:
"""Perform actual tracking work (runs in background).
Routes tool activity to appropriate tracking function based on tool type.
Args:
hook_input: Raw hook input dictionary.
"""
from datetime import datetime
parsed = HookInput.from_dict(hook_input)
log_path = Path.home() / ".claude" / "hooks" / "logs" / "recall-track.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
try:
if parsed.tool_name in FILE_TOOLS:
track_file_activity(
parsed.tool_name,
parsed.tool_input,
parsed.session_id,
parsed.cwd,
log_path,
)
elif parsed.tool_name in SEARCH_TOOLS:
track_search_activity(
parsed.tool_name,
parsed.tool_input,
parsed.session_id,
parsed.cwd,
log_path,
)
elif parsed.tool_name in WEB_TOOLS:
track_web_activity(
parsed.tool_name,
parsed.tool_input,
parsed.tool_response,
parsed.session_id,
log_path,
)
elif parsed.tool_name == "Task":
track_task_activity(
parsed.tool_input,
parsed.tool_response,
parsed.session_id,
log_path,
)
elif parsed.tool_name == "Bash":
track_bash_activity(
parsed.tool_input,
parsed.tool_response,
parsed.session_id,
log_path,
)
elif parsed.tool_name.startswith("mcp__"):
track_mcp_activity(
parsed.tool_name,
parsed.tool_input,
parsed.tool_response,
parsed.session_id,
log_path,
)
# ALWAYS store tool activity - no exceptions
try:
os.chdir(parsed.cwd)
namespace = get_project_namespace()
with get_daemon_client() as client:
client.store(
content=f"Tool: {parsed.tool_name} in {namespace}",
namespace=namespace,
memory_type="session",
importance=0.1, # Very low - high volume
metadata={
"source": "recall-track",
"tool_name": parsed.tool_name,
"session_id": parsed.session_id,
},
)
except Exception:
pass # Don't fail if storage fails
except Exception as e:
try:
timestamp = datetime.now().isoformat()
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"{timestamp} | ERROR: {e}\n")
except Exception:
pass
# =============================================================================
# Main Entry Point
# =============================================================================
def main() -> None:
"""Main hook entry point.
In foreground mode:
- Reads stdin
- Updates session activity
- Generates feedback
- Outputs to stdout
- Spawns background worker for tracking
In background mode:
- Performs actual tracking work
"""
if len(sys.argv) > 2 and sys.argv[1] == "--background":
run_background(sys.argv[2])
return
hook_input = read_hook_input()
if not hook_input:
return
_process_foreground(hook_input)
def _process_foreground(hook_input: dict[str, Any]) -> None:
"""Process hook input in foreground mode.
Generates feedback synchronously, then spawns background worker.
Args:
hook_input: Raw hook input dictionary.
"""
import json
import os
import subprocess
import tempfile
parsed = HookInput.from_dict(hook_input)
try:
file_path = extract_file_path(parsed.tool_name, parsed.tool_input)
if file_path:
action = get_action(parsed.tool_name)
update_session_activity(file_path, action)
session_activity = load_session_activity()
feedback = generate_feedback(
parsed.tool_name,
parsed.tool_input,
parsed.tool_response,
session_activity,
)
if feedback:
output = {
"hookSpecificOutput": {
"hookEventName": "PostToolUse",
"additionalContext": feedback,
},
}
print(json.dumps(output))
except Exception:
pass
try:
fd, temp_path = tempfile.mkstemp(suffix=".json", prefix="recall-track-")
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(hook_input, f)
subprocess.Popen(
[sys.executable, __file__, "--background", temp_path],
start_new_session=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
except Exception:
_do_tracking(hook_input)
if __name__ == "__main__":
main()