rotate_log.py•15.2 kB
"""Enhanced tool for rotating the active project's progress log with comprehensive auditability."""
from __future__ import annotations
import json
import uuid
from pathlib import Path
from typing import Any, Dict, Optional, List
from datetime import datetime
from scribe_mcp import server as server_module
from scribe_mcp.server import app
from scribe_mcp.tools.project_utils import load_active_project
# Note: generate_doc_templates imported for type checking only
# from scribe_mcp.tools.generate_doc_templates import generate_doc_templates
from scribe_mcp.utils.files import rotate_file, ensure_parent, verify_file_integrity
from scribe_mcp.utils.integrity import (
compute_file_hash,
create_rotation_metadata,
count_file_lines
)
from scribe_mcp.utils.audit import (
store_rotation_metadata,
get_audit_manager
)
from scribe_mcp.utils.rotation_state import (
get_state_manager,
get_next_sequence_number,
generate_rotation_id,
update_project_state
)
from scribe_mcp.templates import (
substitution_context,
create_rotation_context
)
from scribe_mcp.utils.time import format_utc
from scribe_mcp import reminders
@app.tool()
async def rotate_log(
suffix: Optional[str] = None,
custom_metadata: Optional[str] = None,
confirm: Optional[bool] = False,
dry_run: Optional[bool] = None
) -> Dict[str, Any]:
"""
Enhanced log rotation with comprehensive auditability and integrity verification.
SAFETY NOTE: This tool defaults to dry-run mode for safety. Use confirm=true
to perform actual rotation. Progress logs are never lost due to atomic
operations and preflight backups.
Args:
suffix: Optional suffix for the archive filename (default: "archive")
custom_metadata: JSON string of additional metadata to include
confirm: Must be True to perform actual rotation (default: False)
dry_run: If True, simulate rotation without making changes (default: True)
Returns:
Rotation result with dry-run information by default
"""
state_snapshot = await server_module.state_manager.record_tool("rotate_log")
project, _, recent = await load_active_project(server_module.state_manager)
reminders_payload: List[Dict[str, Any]] = []
# Parse custom metadata if provided
parsed_metadata = None
if custom_metadata:
try:
parsed_metadata = json.loads(custom_metadata)
except json.JSONDecodeError:
return {
"ok": False,
"error": "Invalid JSON in custom_metadata parameter",
"suggestion": "Ensure custom_metadata is valid JSON string",
"recent_projects": list(recent),
}
if not project:
return {
"ok": False,
"error": "No project configured.",
"suggestion": "Invoke set_project before rotating logs",
"recent_projects": list(recent),
"reminders": reminders_payload,
}
try:
# Initialize managers
audit_manager = get_audit_manager()
state_manager = get_state_manager()
# Validate progress log exists
progress_log_path = Path(project["progress_log"])
if not progress_log_path.exists():
return {
"ok": False,
"error": f"Progress log not found: {progress_log_path}",
"suggestion": "Create initial progress log entries before rotating",
"recent_projects": list(recent),
}
# Generate rotation metadata
rotation_start_time = datetime.utcnow()
rotation_timestamp = rotation_start_time.isoformat() + " UTC"
rotation_id = generate_rotation_id(project["name"])
# Analyze current log
file_hash, file_size = compute_file_hash(str(progress_log_path))
entry_count = count_file_lines(str(progress_log_path))
# Get sequence number and hash chain info
sequence_number = get_next_sequence_number(project["name"])
hash_chain_info = state_manager.get_hash_chain_info(project["name"])
previous_hash = hash_chain_info.get("last_hash")
if dry_run:
return {
"ok": True,
"dry_run": True,
"rotation_id": rotation_id,
"rotation_timestamp": rotation_timestamp,
"project": project["name"],
"current_log_path": str(progress_log_path),
"file_size": file_size,
"file_hash": file_hash,
"entry_count": entry_count,
"sequence_number": sequence_number,
"previous_hash": previous_hash,
"recent_projects": list(recent),
"archived_to": None, # No archive created in dry run
}
# Perform enhanced rotation
# Default to dry-run for safety unless explicitly confirmed OR dry_run is explicitly set
is_dry_run = dry_run if dry_run is not None else (not confirm)
if is_dry_run:
safe_rotation_id = rotation_id[:8]
archive_suffix = f"{(suffix or 'archive')}_{safe_rotation_id}"
potential_archive = await rotate_file(
progress_log_path,
archive_suffix,
confirm=confirm,
dry_run=True
)
file_info = verify_file_integrity(progress_log_path)
return {
"ok": True,
"dry_run": True,
"archive_path": str(potential_archive),
"archived_to": str(potential_archive), # Add alias for backwards compatibility
"project": project["name"],
"rotation_id": rotation_id,
"current_file_size_mb": file_info.get("size_mb", 0),
"current_entry_count": file_info.get("line_count", 0),
"current_file_hash": file_info.get("sha256", ""),
"requires_confirmation": not confirm,
"suggestion": "Add confirm=true to perform actual rotation",
"recent_projects": list(recent),
"reminders": reminders_payload,
}
# Step 1: Create rotation context and template content first
rotation_context = create_rotation_context(
rotation_id=rotation_id,
rotation_timestamp=rotation_timestamp,
previous_log_path="ARCHIVE_PATH_PLACEHOLDER", # Will be updated after rotation
previous_log_hash="HASH_PLACEHOLDER", # Will be updated after rotation
previous_log_entries=str(entry_count),
current_sequence=str(sequence_number),
total_rotations=str(hash_chain_info.get("current_sequence", 0) + 1),
hash_chain_previous=previous_hash,
hash_chain_sequence=str(sequence_number),
hash_chain_root=hash_chain_info.get("root_hash") or ""
)
# Generate template context
template_context = substitution_context(
project_name=project["name"],
author=project.get("defaults", {}).get("agent", "Scribe"),
rotation_context=rotation_context
)
# Import template rendering functions for direct content generation
from scribe_mcp.tools.generate_doc_templates import _render_template
from scribe_mcp.templates import load_templates
# Load the progress log template
templates = await load_templates()
template_body = templates.get("progress_log", "")
# Generate the complete new log content in memory first
try:
new_log_content = _render_template(template_body, template_context)
except Exception as template_error:
print(f"Warning: Template generation failed: {template_error}")
# Fallback to basic content
new_log_content = f"""# Progress Log
## Rotation Notice
Previous log was archived with rotation ID: {template_context.get('ROTATION_ID', 'unknown')}
Created: {template_context.get('DATE_UTC', 'Unknown')}
Project: {template_context.get('project_name', 'Unknown Project')}
Author: {template_context.get('author', 'Scribe')}
---
"""
# Step 2: Bulletproof rotation with preflight backup
if not confirm:
return {
"ok": False,
"error": "Rotation requires explicit confirmation. Add confirm=true to proceed.",
"suggestion": "Use confirm=true to perform rotation, or dry_run=true to preview changes",
"recent_projects": list(recent),
"reminders": reminders_payload,
}
safe_rotation_id = rotation_id[:8]
archive_suffix = f"{(suffix or 'archive')}_{safe_rotation_id}"
archive_path = await rotate_file(
progress_log_path,
archive_suffix,
confirm=True,
dry_run=False,
template_content=new_log_content
)
# Step 2: Create comprehensive rotation metadata using actual archive path
rotation_metadata = create_rotation_metadata(
archived_file_path=str(archive_path),
rotation_uuid=rotation_id,
rotation_timestamp=rotation_timestamp,
sequence_number=sequence_number,
previous_hash=previous_hash
)
# Step 3: Update rotation context with actual archive information
rotation_context.update({
"previous_log_path": str(archive_path),
"previous_log_hash": compute_file_hash(str(archive_path)),
})
# Step 4: Enhanced audit trail - emit rotation journal entry
try:
from scribe_mcp.utils.files import WriteAheadLog
wal = WriteAheadLog(str(archive_path))
rotation_journal_entry = {
"op": "rotate",
"from": str(progress_log_path),
"to": str(archive_path),
"rotation_id": rotation_id,
"timestamp": rotation_timestamp,
"sequence": str(sequence_number),
"entries_rotated": str(entry_count)
}
wal.write_entry(rotation_journal_entry)
except Exception as wal_error:
print(f"Warning: Failed to write rotation journal entry: {wal_error}")
# Step 5: Store audit trail record
audit_success = store_rotation_metadata(project["name"], rotation_metadata)
# Step 6: Update rotation state
state_success = update_project_state(project["name"], rotation_metadata)
# Step 7: Add custom metadata if provided
if parsed_metadata:
rotation_metadata.update(parsed_metadata)
# Calculate rotation performance
rotation_end_time = datetime.utcnow()
rotation_duration = (rotation_end_time - rotation_start_time).total_seconds()
if rotation_duration < 0:
# Clock adjustments can yield negative deltas; clamp to zero for reporting.
rotation_duration = 0.0
# Step 8: Get reminders
reminders_payload = await reminders.get_reminders(
project,
tool_name="rotate_log",
state=state_snapshot,
)
return {
"ok": True,
"rotation_completed": True,
"rotation_id": rotation_id,
"rotation_timestamp": rotation_timestamp,
"project": project["name"],
"sequence_number": sequence_number,
"archive_path": str(archive_path),
"archived_to": str(archive_path), # Add alias for backwards compatibility
"archive_hash": rotation_metadata["file_hash"],
"archive_size": rotation_metadata["file_size"],
"entry_count": rotation_metadata["entry_count"],
"rotation_duration_seconds": rotation_duration,
"hash_chain_previous": previous_hash,
"audit_trail_stored": audit_success,
"state_updated": state_success,
"integrity_verified": True,
"template_generated": True,
"atomic_template_used": True,
"recent_projects": list(recent),
"reminders": reminders_payload,
}
except Exception as exc:
return {
"ok": False,
"error": f"Enhanced rotation failed: {exc}",
"error_type": type(exc).__name__,
"suggestion": "Check file permissions and disk space",
"recent_projects": list(recent),
"reminders": reminders_payload,
}
@app.tool()
async def verify_rotation_integrity(rotation_id: str) -> Dict[str, Any]:
"""
Verify the integrity of a specific rotation.
Args:
rotation_id: UUID of the rotation to verify
"""
state_snapshot = await server_module.state_manager.record_tool("verify_rotation_integrity")
project, _, recent = await load_active_project(server_module.state_manager)
if not project:
return {
"ok": False,
"error": "No project configured.",
"suggestion": "Invoke set_project before verifying rotations",
"recent_projects": list(recent),
}
try:
audit_manager = get_audit_manager()
is_valid, message = audit_manager.verify_rotation_integrity(
project["name"], rotation_id
)
return {
"ok": True,
"rotation_id": rotation_id,
"project": project["name"],
"integrity_valid": is_valid,
"verification_message": message,
"verified_at": format_utc(),
"recent_projects": list(recent),
}
except Exception as exc:
return {
"ok": False,
"error": f"Integrity verification failed: {exc}",
"rotation_id": rotation_id,
"recent_projects": list(recent),
}
@app.tool()
async def get_rotation_history(limit: int = 10) -> Dict[str, Any]:
"""
Get rotation history for the active project.
Args:
limit: Maximum number of rotations to return
"""
state_snapshot = await server_module.state_manager.record_tool("get_rotation_history")
project, _, recent = await load_active_project(server_module.state_manager)
if not project:
return {
"ok": False,
"error": "No project configured.",
"suggestion": "Invoke set_project before querying rotation history",
"recent_projects": list(recent),
}
try:
audit_manager = get_audit_manager()
history = audit_manager.get_rotation_history(
project["name"], limit=limit
)
return {
"ok": True,
"project": project["name"],
"rotation_count": len(history),
"rotations": history,
"queried_at": format_utc(),
"recent_projects": list(recent),
}
except Exception as exc:
return {
"ok": False,
"error": f"Failed to get rotation history: {exc}",
"recent_projects": list(recent),
}