"""Flexible log rotation tools leveraging shared logging utilities."""
from __future__ import annotations
import asyncio
import json
import re
import uuid
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple
from scribe_mcp.utils.integrity import count_file_lines
from scribe_mcp.utils.time import format_utc, utcnow
from scribe_mcp import server as server_module
from scribe_mcp.server import app
from scribe_mcp.config.log_config import load_log_config
from scribe_mcp.utils.config_manager import ConfigManager, apply_response_defaults, build_response_payload
from scribe_mcp.utils.bulk_processor import BulkProcessor
from scribe_mcp.shared.base_logging_tool import LoggingToolMixin
from scribe_mcp.shared.logging_utils import (
LoggingContext,
ProjectResolutionError,
resolve_log_definition as shared_resolve_log_definition,
resolve_logging_context,
)
from scribe_mcp.template_engine import Jinja2TemplateEngine, TemplateEngineError
from scribe_mcp.templates import (
TEMPLATE_FILENAMES,
create_rotation_context,
load_templates,
substitution_context,
)
from scribe_mcp.tools.base.parameter_normalizer import normalize_dict_param
from scribe_mcp.utils.parameter_validator import ToolValidator, BulletproofParameterCorrector
from scribe_mcp.utils.estimator import (
EntryCountEstimate,
FileSizeEstimator,
ThresholdEstimator
)
from scribe_mcp.utils.error_handler import ErrorHandler, HealingErrorHandler, ExceptionHealer
from scribe_mcp.utils.config_manager import ConfigManager, apply_response_defaults, build_response_payload, BulletproofFallbackManager
from scribe_mcp.tools.config.rotate_log_config import RotateLogConfig
from scribe_mcp.utils.audit import get_audit_manager, store_rotation_metadata
from scribe_mcp.utils import audit as audit_utils
from scribe_mcp.utils.files import rotate_file, verify_file_integrity, file_lock
from scribe_mcp.utils.integrity import (
create_rotation_metadata,
count_file_lines,
compute_file_hash,
)
from scribe_mcp.utils.rotation_state import (
get_next_sequence_number,
get_state_manager,
generate_rotation_id,
update_project_state,
)
from scribe_mcp.utils.time import format_utc
from scribe_mcp import reminders
DEFAULT_AUTO_THRESHOLD_ENTRIES = 500
DEFAULT_BYTES_PER_LINE = 80.0
MIN_BYTES_PER_LINE = 16.0
MAX_BYTES_PER_LINE = 512.0
EMA_SMOOTHING_ALPHA = 0.2
ESTIMATION_BAND_RATIO = 0.1
ESTIMATION_BAND_MIN = 250
TAIL_SAMPLE_BYTES = 1024 * 1024
_SUFFIX_SANITIZER = re.compile(r"[^A-Za-z0-9._-]+")
class _RotateLogHelper(LoggingToolMixin):
def __init__(self) -> None:
self.server_module = server_module
self.parameter_corrector = BulletproofParameterCorrector()
self.error_handler = ErrorHandler()
self.healing_error_handler = HealingErrorHandler()
_ROTATE_HELPER = _RotateLogHelper()
async def verify_rotation_integrity(rotation_id: str, project: str | None = None) -> Dict[str, Any]:
"""
Verify archive integrity for a prior rotation.
Backwards compatible helper for CLI/tests that import from `scribe_mcp.tools.rotate_log`.
(Not an MCP tool entry-point.)
"""
state = await server_module.state_manager.load()
project_name = project or state.current_project
if not project_name:
return {"ok": False, "error": "No project configured"}
try:
integrity_ok, message = audit_utils.verify_rotation_integrity(project_name, rotation_id)
return {
"ok": True,
"project": project_name,
"rotation_id": rotation_id,
"integrity_valid": bool(integrity_ok),
"message": message,
}
except Exception as exc: # pragma: no cover - defensive
return {"ok": False, "error": str(exc), "project": project_name, "rotation_id": rotation_id}
async def get_rotation_history(limit: int | None = None, project: str | None = None) -> Dict[str, Any]:
"""
Return recent rotation history for the current (or explicit) project.
Backwards compatible helper for CLI/tests that import from `scribe_mcp.tools.rotate_log`.
(Not an MCP tool entry-point.)
"""
state = await server_module.state_manager.load()
project_name = project or state.current_project
if not project_name:
return {"ok": False, "error": "No project configured"}
try:
rotations = audit_utils.get_rotation_history(project_name, limit=limit)
return {
"ok": True,
"project": project_name,
"rotation_count": len(rotations),
"rotations": rotations,
}
except Exception as exc: # pragma: no cover - defensive
return {"ok": False, "error": str(exc), "project": project_name}
# Global configuration manager for parameter handling
_CONFIG_MANAGER = ConfigManager("rotate_log")
# Phase 3 Enhanced utilities integration
_EXCEPTION_HEALER = ExceptionHealer()
_FALLBACK_MANAGER = BulletproofFallbackManager()
def _heal_rotate_log_parameters(
suffix: Optional[str] = None,
custom_metadata: Optional[Dict[str, Any]] = None,
confirm: bool = False,
dry_run: bool = False,
dry_run_mode: str = "estimate",
log_type: Optional[str] = None,
log_types: Optional[List[str]] = None,
rotate_all: bool = False,
auto_threshold: bool = False,
threshold_entries: Optional[int] = None,
config: Optional[Dict[str, Any]] = None
) -> tuple[dict, bool, List[str]]:
"""Heal rotate_log parameters using Phase 1 exception handling."""
healing_messages = []
healing_applied = False
# Define valid values for enum parameters
valid_dry_run_modes = {"estimate", "precise"}
valid_log_types = {"progress", "doc_updates", "security", "bugs"}
healed_params = {}
# Heal suffix parameter (string normalization)
if suffix is not None:
original_suffix = suffix
healed_suffix = str(suffix).strip()
# Sanitize suffix using existing regex
healed_suffix = _SUFFIX_SANITIZER.sub('_', healed_suffix)
if healed_suffix != original_suffix:
healing_applied = True
healing_messages.append(f"Auto-corrected suffix from '{original_suffix}' to '{healed_suffix}'")
healed_params["suffix"] = healed_suffix
else:
healed_params["suffix"] = None
# Heal custom_metadata parameter using Phase 1 corrector
healed_metadata = BulletproofParameterCorrector.correct_metadata_parameter(custom_metadata)
if healed_metadata != custom_metadata:
healing_applied = True
healing_messages.append(f"Auto-corrected custom_metadata parameter to valid dict")
healed_params["custom_metadata"] = healed_metadata
# Heal confirm parameter
original_confirm = confirm
healed_confirm = bool(confirm)
if isinstance(confirm, str):
healed_confirm = confirm.lower() in ("true", "1", "yes")
if healed_confirm != confirm:
healing_applied = True
healing_messages.append(f"Auto-corrected confirm from '{confirm}' to {healed_confirm}")
elif healed_confirm != original_confirm:
healing_applied = True
healing_messages.append(f"Auto-corrected confirm to boolean {healed_confirm}")
healed_params["confirm"] = healed_confirm
# Heal dry_run parameter
original_dry_run = dry_run
healed_dry_run = bool(dry_run)
if isinstance(dry_run, str):
healed_dry_run = dry_run.lower() in ("true", "1", "yes")
if healed_dry_run != dry_run:
healing_applied = True
healing_messages.append(f"Auto-corrected dry_run from '{dry_run}' to {healed_dry_run}")
elif healed_dry_run != original_dry_run:
healing_applied = True
healing_messages.append(f"Auto-corrected dry_run to boolean {healed_dry_run}")
healed_params["dry_run"] = healed_dry_run
# Heal dry_run_mode parameter
if dry_run_mode is not None:
original_dry_run_mode = dry_run_mode
healed_dry_run_mode = BulletproofParameterCorrector.correct_enum_parameter(
original_dry_run_mode, valid_dry_run_modes, "dry_run_mode", "estimate"
)
if healed_dry_run_mode != original_dry_run_mode:
healing_applied = True
healing_messages.append(f"Auto-corrected dry_run_mode from '{original_dry_run_mode}' to '{healed_dry_run_mode}'")
healed_params["dry_run_mode"] = healed_dry_run_mode
else:
healed_params["dry_run_mode"] = "estimate"
# Heal log_type parameter
if log_type is not None:
original_log_type = log_type
healed_log_type = BulletproofParameterCorrector.correct_enum_parameter(
original_log_type, valid_log_types, "log_type", "progress"
)
if healed_log_type != original_log_type:
healing_applied = True
healing_messages.append(f"Auto-corrected log_type from '{original_log_type}' to '{healed_log_type}'")
healed_params["log_type"] = healed_log_type
else:
healed_params["log_type"] = None
# Heal log_types array parameter
if log_types is not None:
original_log_types = log_types
healed_log_types = BulletproofParameterCorrector.correct_list_parameter(
original_log_types, "log_types"
)
if healed_log_types != original_log_types:
healing_applied = True
healing_messages.append(f"Auto-corrected log_types parameter from {original_log_types} to {healed_log_types}")
healed_params["log_types"] = healed_log_types
else:
healed_params["log_types"] = None
# Heal rotate_all parameter
original_rotate_all = rotate_all
healed_rotate_all = bool(rotate_all)
if isinstance(rotate_all, str):
healed_rotate_all = rotate_all.lower() in ("true", "1", "yes")
if healed_rotate_all != rotate_all:
healing_applied = True
healing_messages.append(f"Auto-corrected rotate_all from '{rotate_all}' to {healed_rotate_all}")
elif healed_rotate_all != original_rotate_all:
healing_applied = True
healing_messages.append(f"Auto-corrected rotate_all to boolean {healed_rotate_all}")
healed_params["rotate_all"] = healed_rotate_all
# Heal auto_threshold parameter
original_auto_threshold = auto_threshold
healed_auto_threshold = bool(auto_threshold)
if isinstance(auto_threshold, str):
healed_auto_threshold = auto_threshold.lower() in ("true", "1", "yes")
if healed_auto_threshold != auto_threshold:
healing_applied = True
healing_messages.append(f"Auto-corrected auto_threshold from '{auto_threshold}' to {healed_auto_threshold}")
elif healed_auto_threshold != original_auto_threshold:
healing_applied = True
healing_messages.append(f"Auto-corrected auto_threshold to boolean {healed_auto_threshold}")
healed_params["auto_threshold"] = healed_auto_threshold
# Heal threshold_entries parameter
if threshold_entries is not None:
original_threshold_entries = threshold_entries
healed_threshold_entries = BulletproofParameterCorrector.correct_numeric_parameter(
original_threshold_entries, 1, 10000, "threshold_entries", 500
)
if healed_threshold_entries != original_threshold_entries:
healing_applied = True
healing_messages.append(f"Auto-corrected threshold_entries from '{original_threshold_entries}' to '{healed_threshold_entries}'")
healed_params["threshold_entries"] = healed_threshold_entries
else:
healed_params["threshold_entries"] = None
# Heal config parameter - preserve RotateLogConfig objects if provided
if config is not None:
if isinstance(config, RotateLogConfig):
# Config object is already valid, don't convert to dict
healed_config = config
else:
# For non-RotateLogConfig objects, use basic healing but don't use correct_metadata_parameter
# as it will convert objects to {"value": "string_representation"}
if isinstance(config, dict):
# Dict is already in good shape, just heal the values within it
healed_config = {}
for key, value in config.items():
if key in ["suffix", "custom_metadata", "dry_run_mode", "log_type"]:
# String parameters
if value is not None:
healed_config[key] = str(value).strip()
else:
healed_config[key] = value
elif key in ["confirm", "dry_run", "rotate_all", "auto_threshold"]:
# Boolean parameters
if isinstance(value, str):
healed_config[key] = value.lower() in ("true", "1", "yes")
else:
healed_config[key] = bool(value) if value is not None else value
elif key in ["threshold_entries"]:
# Numeric parameters
try:
healed_config[key] = int(value) if value is not None else value
except (ValueError, TypeError):
healed_config[key] = 500 # default
elif key in ["log_types"]:
# List parameters
if isinstance(value, str):
healed_config[key] = [item.strip() for item in value.split(",") if item.strip()]
elif isinstance(value, list):
healed_config[key] = value
else:
healed_config[key] = None
else:
healed_config[key] = value
healing_applied = True
healing_messages.append(f"Auto-corrected config parameter dictionary values")
else:
# Convert other types to dict structure
healed_config = BulletproofParameterCorrector.correct_metadata_parameter(config)
healing_applied = True
healing_messages.append(f"Auto-corrected config parameter to valid dict")
# Try to convert healed dict to RotateLogConfig if it has the right structure
try:
if isinstance(healed_config, dict):
healed_config = RotateLogConfig.from_legacy_params(**healed_config)
healing_messages.append(f"Converted healed config dict to RotateLogConfig object")
except Exception as conversion_error:
healing_messages.append(f"Failed to convert healed config to RotateLogConfig: {conversion_error}, using dict")
healed_params["config"] = healed_config
else:
healed_params["config"] = None
return healed_params, healing_applied, healing_messages
def _add_healing_info_to_rotate_response(
response: Dict[str, Any],
healing_applied: bool,
healing_messages: List[str]
) -> Dict[str, Any]:
"""Add healing information to rotate_log response if parameters were corrected."""
if healing_applied and healing_messages:
response["parameter_healing"] = {
"applied": True,
"messages": healing_messages,
"message": "Parameters auto-corrected using Phase 1 exception healing"
}
return response
class RotationTarget(NamedTuple):
log_type: str
path: Path
definition: Dict[str, Any]
# Global estimator instances
_FILE_SIZE_ESTIMATOR = FileSizeEstimator(
default_bytes_per_line=DEFAULT_BYTES_PER_LINE,
min_bytes_per_line=MIN_BYTES_PER_LINE,
max_bytes_per_line=MAX_BYTES_PER_LINE,
tail_sample_bytes=TAIL_SAMPLE_BYTES
)
_THRESHOLD_ESTIMATOR = ThresholdEstimator()
async def _write_rotated_log_header(path: Path, content: str) -> None:
"""Write rendered rotation template to the freshly rotated log."""
def _write() -> None:
with file_lock(path, 'w', timeout=30.0) as handle:
handle.write(content)
if not content.endswith("\n"):
handle.write("\n")
await asyncio.to_thread(_write)
def _validate_rotation_parameters(
suffix: Optional[str],
custom_metadata: Optional[str],
confirm: Optional[bool],
dry_run: Optional[bool],
dry_run_mode: Optional[str],
log_type: Optional[str],
log_types: Optional[List[str]],
rotate_all: Optional[bool],
auto_threshold: Optional[bool],
threshold_entries: Optional[int],
config: Optional[RotateLogConfig]
) -> Tuple[RotateLogConfig, Dict[str, Any]]:
"""
Validate and prepare rotation parameters using enhanced Phase 3 utilities.
This function replaces the monolithic parameter handling section of rotate_log
with bulletproof parameter validation and healing.
"""
try:
# Apply Phase 1 BulletproofParameterCorrector for initial parameter healing
healed_params = {}
healing_applied = False
# Define valid values for enum parameters
valid_dry_run_modes = {"estimate", "precise"}
valid_log_types = {"progress", "doc_updates", "security", "bugs"}
# Heal suffix parameter
if suffix:
healed_suffix = _ROTATE_HELPER.parameter_corrector.correct_message_parameter(suffix)
if healed_suffix != suffix:
healed_params["suffix"] = healed_suffix
healing_applied = True
# Heal custom_metadata parameter
if custom_metadata:
healed_metadata = _ROTATE_HELPER.parameter_corrector.correct_metadata_parameter(custom_metadata)
if healed_metadata != custom_metadata:
healed_params["custom_metadata"] = healed_metadata
healing_applied = True
# Heal dry_run_mode parameter
if dry_run_mode:
healed_dry_run_mode = _ROTATE_HELPER.parameter_corrector.correct_enum_parameter(
dry_run_mode, valid_dry_run_modes, field_name="dry_run_mode"
)
if healed_dry_run_mode != dry_run_mode:
healed_params["dry_run_mode"] = healed_dry_run_mode
healing_applied = True
# Heal log_type parameter
if log_type:
healed_log_type = _ROTATE_HELPER.parameter_corrector.correct_enum_parameter(
log_type, valid_log_types, field_name="log_type"
)
if healed_log_type != log_type:
healed_params["log_type"] = healed_log_type
healing_applied = True
# Heal log_types parameter
if log_types:
healed_log_types = _ROTATE_HELPER.parameter_corrector.correct_list_parameter(
log_types, field_name="log_types"
)
if healed_log_types != log_types:
healed_params["log_types"] = healed_log_types
healing_applied = True
# Heal threshold_entries parameter
if threshold_entries:
healed_threshold = _ROTATE_HELPER.parameter_corrector.correct_numeric_parameter(
threshold_entries, min_value=1, max_value=1000000, field_name="threshold_entries"
)
if healed_threshold != threshold_entries:
healed_params["threshold_entries"] = healed_threshold
healing_applied = True
# Apply fallbacks for corrected parameters
if healing_applied:
fallback_params = _FALLBACK_MANAGER.resolve_parameter_fallback(
"rotate_log", healed_params, context="parameter_validation"
)
healed_params.update(fallback_params)
# Update parameters with healed values
final_suffix = healed_params.get("suffix", suffix)
final_custom_metadata = healed_params.get("custom_metadata", custom_metadata)
final_confirm = healed_params.get("confirm", confirm)
final_dry_run = healed_params.get("dry_run", dry_run)
final_dry_run_mode = healed_params.get("dry_run_mode", dry_run_mode)
final_log_type = healed_params.get("log_type", log_type)
final_log_types = healed_params.get("log_types", log_types)
final_rotate_all = healed_params.get("rotate_all", rotate_all)
final_auto_threshold = healed_params.get("auto_threshold", auto_threshold)
final_threshold_entries = healed_params.get("threshold_entries", threshold_entries)
# Create configuration using dual parameter support
if config is not None:
# Create configuration from legacy parameters
legacy_config = RotateLogConfig.from_legacy_params(
suffix=final_suffix,
custom_metadata=final_custom_metadata,
confirm=final_confirm,
dry_run=final_dry_run,
dry_run_mode=final_dry_run_mode,
log_type=final_log_type,
log_types=final_log_types,
rotate_all=final_rotate_all,
auto_threshold=final_auto_threshold,
threshold_entries=final_threshold_entries
)
# Merge with provided config (legacy parameters take precedence)
config_dict = config.to_dict()
legacy_dict = legacy_config.to_dict()
for key, value in legacy_dict.items():
if value is not None:
config_dict[key] = value
final_config = RotateLogConfig(**config_dict)
else:
final_config = RotateLogConfig.from_legacy_params(
suffix=final_suffix,
custom_metadata=final_custom_metadata,
confirm=final_confirm,
dry_run=final_dry_run,
dry_run_mode=final_dry_run_mode,
log_type=final_log_type,
log_types=final_log_types,
rotate_all=final_rotate_all,
auto_threshold=final_auto_threshold,
threshold_entries=final_threshold_entries
)
return final_config, {"healing_applied": healing_applied, "healed_params": healed_params}
except Exception as e:
# Apply Phase 2 ExceptionHealer for parameter validation errors
healed_exception = _EXCEPTION_HEALER.heal_parameter_validation_error(
e, {
"suffix": suffix,
"dry_run_mode": dry_run_mode,
"log_type": log_type,
"threshold_entries": threshold_entries
}
)
if healed_exception["success"]:
# Use healed values from exception recovery
fallback_params = _FALLBACK_MANAGER.resolve_parameter_fallback(
"rotate_log", healed_exception["healed_values"], context="exception_healing"
)
# Create safe fallback configuration
safe_config = RotateLogConfig.from_legacy_params(
suffix=fallback_params.get("suffix", suffix),
custom_metadata=fallback_params.get("custom_metadata", custom_metadata),
confirm=fallback_params.get("confirm", confirm),
dry_run=fallback_params.get("dry_run", dry_run),
dry_run_mode=fallback_params.get("dry_run_mode", dry_run_mode),
log_type=fallback_params.get("log_type", log_type),
log_types=fallback_params.get("log_types", log_types),
rotate_all=fallback_params.get("rotate_all", rotate_all),
auto_threshold=fallback_params.get("auto_threshold", auto_threshold),
threshold_entries=fallback_params.get("threshold_entries", threshold_entries)
)
return safe_config, {
"healing_applied": True,
"exception_healing": True,
"healed_params": healed_exception["healed_values"],
"fallback_used": True
}
else:
# Ultimate fallback - use BulletproofFallbackManager
fallback_params = _FALLBACK_MANAGER.apply_emergency_fallback("rotate_log", {
"suffix": suffix,
"dry_run_mode": dry_run_mode,
"log_type": log_type,
"threshold_entries": threshold_entries or 500
})
emergency_config = RotateLogConfig.from_legacy_params(
suffix=fallback_params.get("suffix", suffix),
custom_metadata=fallback_params.get("custom_metadata", custom_metadata),
confirm=fallback_params.get("confirm", False), # Safe default
dry_run=fallback_params.get("dry_run", True), # Safe default
dry_run_mode=fallback_params.get("dry_run_mode", "estimate"),
log_type=fallback_params.get("log_type", "progress"),
log_types=fallback_params.get("log_types", log_types),
rotate_all=fallback_params.get("rotate_all", rotate_all),
auto_threshold=fallback_params.get("auto_threshold", auto_threshold),
threshold_entries=fallback_params.get("threshold_entries", 500)
)
return emergency_config, {
"healing_applied": True,
"emergency_fallback": True,
"fallback_params": fallback_params
}
def _prepare_rotation_operation(
final_config: RotateLogConfig,
context,
project: Dict[str, Any]
) -> Dict[str, Any]:
"""
Prepare rotation operation with enhanced error handling and validation.
This function extracts the rotation preparation logic from the monolithic
rotate_log function and adds bulletproof error handling.
"""
try:
# Extract parameters from config
suffix = final_config.suffix
custom_metadata = final_config.custom_metadata
confirm = final_config.confirm
dry_run = final_config.dry_run
dry_run_mode = final_config.dry_run_mode
log_type = final_config.log_type
log_types = final_config.log_types
rotate_all = final_config.rotate_all
auto_threshold = final_config.auto_threshold
threshold_entries = final_config.threshold_entries
# Determine which log types to rotate
try:
if rotate_all:
# Rotate all configured log types
log_config = load_log_config(project["root"])
target_log_types = list(log_config.keys())
elif log_types:
# Use specified log types
target_log_types = log_types
elif log_type:
# Use single log type
target_log_types = [log_type]
else:
# Default to progress log
target_log_types = ["progress"]
except Exception as log_type_error:
# Try to heal log type determination error
healed_log_types = _EXCEPTION_HEALER.heal_parameter_validation_error(
log_type_error, {"rotate_all": rotate_all, "log_type": log_type, "log_types": log_types}
)
if healed_log_types["success"]:
target_log_types = healed_log_types["healed_values"].get("target_log_types", ["progress"])
else:
# Apply fallback log types
fallback_log_types = _FALLBACK_MANAGER.apply_context_aware_defaults(
"rotate_log", {"operation": "determine_log_types", "project": project}
)
target_log_types = fallback_log_types.get("target_log_types", ["progress"])
# Validate log types
valid_log_types = {"progress", "doc_updates", "security", "bugs"}
validated_log_types = []
for lt in target_log_types:
if lt in valid_log_types:
validated_log_types.append(lt)
else:
# Try to heal invalid log type
healed_lt = _ROTATE_HELPER.parameter_corrector.correct_enum_parameter(
lt, valid_log_types, field_name="log_type"
)
if healed_lt in valid_log_types:
validated_log_types.append(healed_lt)
if not validated_log_types:
validated_log_types = ["progress"] # Fallback to progress log
# Process custom metadata
processed_metadata = {}
try:
if custom_metadata:
# Try to parse as JSON
try:
processed_metadata = json.loads(custom_metadata)
except json.JSONDecodeError:
# Try to heal JSON parsing
healed_json = _EXCEPTION_HEALER.heal_parameter_validation_error(
ValueError("Invalid JSON in custom_metadata"),
{"custom_metadata": custom_metadata, "error_type": "json_decode"}
)
if healed_json["success"]:
metadata_str = healed_json["healed_values"].get("custom_metadata", "{}")
processed_metadata = json.loads(metadata_str)
else:
# Fallback to string metadata
processed_metadata = {"custom_metadata": custom_metadata, "json_parse_failed": True}
except Exception as metadata_error:
# Apply fallback metadata handling
fallback_metadata = _FALLBACK_MANAGER.apply_context_aware_defaults(
"rotate_log", {"custom_metadata": custom_metadata, "operation": "metadata_processing"}
)
processed_metadata = fallback_metadata.get("processed_metadata", {"error": str(metadata_error)})
# Determine rotation mode
if confirm is None:
final_confirm = auto_threshold or False
else:
final_confirm = confirm
if dry_run is None:
final_dry_run = not final_confirm
else:
final_dry_run = dry_run
# Validate dry run mode
if dry_run_mode and dry_run_mode not in {"estimate", "precise"}:
healed_mode = _ROTATE_HELPER.parameter_corrector.correct_enum_parameter(
dry_run_mode, {"estimate", "precise"}, field_name="dry_run_mode"
)
final_dry_run_mode = healed_mode if healed_mode else "estimate"
else:
final_dry_run_mode = dry_run_mode or "estimate"
# Set up rotation parameters for each log type
rotation_operations = []
for log_type_name in validated_log_types:
try:
# Get log file path (always normalize to Path)
if log_type_name == "progress":
log_path = Path(project["progress_log"])
else:
log_root = Path(project["root"])
log_path = log_root / f"{log_type_name}.log"
# Check if log file exists
if not log_path.exists():
# Skip non-existent logs with warning
rotation_operations.append({
"log_type": log_type_name,
"log_path": log_path,
"status": "skipped",
"reason": "file_not_found",
"warning": f"Log file {log_path} does not exist"
})
continue
# Get entry count for threshold checking
try:
if final_dry_run_mode == "precise":
# Precise count
entry_count = count_file_lines(log_path)
else:
# Estimate count
estimator = EntryCountEstimate()
entry_count = estimator.estimate(log_path)
except Exception as count_error:
# Try to heal counting error
healed_count = _EXCEPTION_HEALER.heal_document_operation_error(
count_error, {"log_path": str(log_path), "operation": "count_entries"}
)
if healed_count["success"]:
entry_count = healed_count["healed_values"].get("entry_count", 0)
else:
# Apply fallback estimation
entry_count = 100 # Safe fallback estimate
# Check auto threshold
should_rotate = True
threshold_reason = None
if auto_threshold and threshold_entries:
if entry_count < threshold_entries:
should_rotate = False
threshold_reason = f"Below threshold: {entry_count} < {threshold_entries}"
# Prepare operation details
operation = {
"log_type": log_type_name,
"log_path": log_path,
"entry_count": entry_count,
"should_rotate": should_rotate,
"threshold_reason": threshold_reason,
"confirm": final_confirm,
"dry_run": final_dry_run,
"dry_run_mode": final_dry_run_mode,
"suffix": suffix,
"metadata": processed_metadata.copy()
}
if should_rotate:
rotation_operations.append(operation)
except Exception as op_error:
# Add error operation but continue with other log types
error_operation = {
"log_type": log_type_name,
"status": "error",
"error": str(op_error),
"warning": f"Failed to prepare rotation for {log_type_name}: {str(op_error)}",
# Provide a best-effort log_path when available so execution
# logic can still reason about this operation without crashing.
"log_path": Path(project["progress_log"]) if log_type_name == "progress" else None,
}
rotation_operations.append(error_operation)
# Return preparation results
return {
"rotation_operations": rotation_operations,
"validated_log_types": validated_log_types,
"final_confirm": final_confirm,
"final_dry_run": final_dry_run,
"final_dry_run_mode": final_dry_run_mode,
"processed_metadata": processed_metadata,
"preparation_complete": True
}
except Exception as e:
# Apply comprehensive exception healing for rotation preparation
healed_result = _EXCEPTION_HEALER.heal_complex_exception_combination(
e, {
"operation": "prepare_rotation_operation",
"config": final_config.to_dict(),
"project": project
}
)
if healed_result and healed_result.get("success") and "healed_values" in healed_result:
# Create emergency rotation operation; best-effort log_path for progress.
emergency_params = _FALLBACK_MANAGER.apply_emergency_fallback(
"rotate_log", healed_result["healed_values"]
)
try:
emergency_log_path = Path(project.get("progress_log", ""))
except Exception:
emergency_log_path = None
return {
"rotation_operations": [{
"log_type": "progress",
"status": "emergency_fallback",
"error": str(e),
"emergency_params": emergency_params,
"log_path": emergency_log_path,
}],
"validated_log_types": ["progress"],
"final_confirm": False,
"final_dry_run": True,
"final_dry_run_mode": "estimate",
"processed_metadata": {"emergency_fallback": True, "error": str(e)},
"preparation_complete": True,
"emergency_fallback": True
}
else:
return {
"rotation_operations": [],
"validated_log_types": [],
"final_confirm": False,
"final_dry_run": True,
"final_dry_run_mode": "estimate",
"processed_metadata": {"preparation_failed": True, "error": str(e)},
"preparation_complete": False,
"error": str(e)
}
async def _execute_rotation_with_fallbacks(
rotation_prep: Dict[str, Any],
final_config: RotateLogConfig,
context,
project: Dict[str, Any]
) -> Dict[str, Any]:
"""
Execute rotation operations with comprehensive error handling and intelligent fallbacks.
This function extracts the rotation execution logic from the monolithic rotate_log
function and adds bulletproof error handling with multiple fallback strategies.
"""
try:
rotation_operations = rotation_prep["rotation_operations"]
final_confirm = rotation_prep["final_confirm"]
final_dry_run = rotation_prep["final_dry_run"]
processed_metadata = rotation_prep["processed_metadata"]
execution_results = []
successful_rotations = []
failed_rotations = []
skipped_rotations = []
# Process each rotation operation
for operation in rotation_operations:
try:
log_type = operation.get("log_type", "unknown")
log_path = operation.get("log_path")
# If we don't have a log_path (e.g., emergency/errored prep),
# record a structured failure and skip execution for this entry.
if log_path is None:
error_result = {
"log_type": log_type,
"status": "failed",
"error": "Missing log_path for rotation operation",
"operation_level_error": True,
}
failed_rotations.append(error_result)
execution_results.append(error_result)
continue
# Check if operation should be skipped
if not operation.get("should_rotate", True):
skipped_result = {
"log_type": log_type,
"status": "skipped",
"reason": operation.get("threshold_reason", "Unknown reason"),
"entry_count": operation.get("entry_count", 0)
}
skipped_rotations.append(skipped_result)
execution_results.append(skipped_result)
continue
# Prepare rotation details
rotation_details = {
"log_path": log_path,
"suffix": operation.get("suffix"),
"dry_run": final_dry_run,
"dry_run_mode": operation.get("dry_run_mode", "estimate"),
"metadata": operation.get("metadata", {}),
"entry_count": operation.get("entry_count", 0)
}
# Execute rotation with error handling
try:
if final_dry_run:
# Dry run execution
if operation.get("dry_run_mode") == "precise":
# Precise dry run: full line count
entry_count = count_file_lines(str(log_path))
else:
# Lightweight estimate based on file size
try:
size_bytes = log_path.stat().st_size if log_path.exists() else 0
except OSError:
size_bytes = 0
# Simple heuristic: avoid pulling in the full estimator stack here
entry_count = int(size_bytes / DEFAULT_BYTES_PER_LINE) if size_bytes > 0 else 0
dry_run_result = {
"log_type": log_type,
"status": "dry_run_complete",
"dry_run": True,
"entry_count": entry_count,
"estimated_size": log_path.stat().st_size if log_path.exists() else 0,
"would_rotate": entry_count > 0
}
execution_results.append(dry_run_result)
successful_rotations.append(dry_run_result)
else:
# Actual rotation execution
try:
rotation_id = generate_rotation_id(project["name"])
sequence_number = get_next_sequence_number(project["name"])
rotation_timestamp = format_utc(utcnow())
# Execute file rotation
repo_root = Path(project.get("root") or settings.project_root).resolve()
rotation_started = time.monotonic()
archive_path = await rotate_file(
log_path,
suffix=operation.get("suffix"),
confirm=True,
repo_root=repo_root,
)
rotation_duration = time.monotonic() - rotation_started
# Verify rotation integrity
archive_info = verify_file_integrity(archive_path, repo_root=repo_root)
integrity_ok = bool(archive_info.get("exists")) and not archive_info.get("error")
rotation_metadata = create_rotation_metadata(
archived_file_path=str(archive_path),
rotation_uuid=rotation_id,
rotation_timestamp=rotation_timestamp,
sequence_number=sequence_number,
log_type=log_type,
)
audit_success = store_rotation_metadata(project["name"], rotation_metadata)
state_success = update_project_state(project["name"], rotation_metadata)
rotation_result = {
"log_type": log_type,
"status": "rotated" if integrity_ok else "rotated_with_warnings",
"dry_run": False,
"original_path": str(log_path),
"archive_path": str(archive_path),
"entry_count": rotation_metadata.get("entry_count", operation.get("entry_count", 0)),
"rotation_id": rotation_id,
"sequence_number": sequence_number,
"rotation_completed": True,
"rotation_duration_seconds": rotation_duration,
"archive_hash": rotation_metadata.get("file_hash"),
"archive_sha256": rotation_metadata.get("file_hash"),
"archive_size_bytes": rotation_metadata.get("file_size"),
"integrity_verified": integrity_ok,
"audit_trail_stored": bool(audit_success),
"state_updated": bool(state_success),
}
if not integrity_ok:
rotation_result["warning"] = "Archive integrity verification failed"
execution_results.append(rotation_result)
successful_rotations.append(rotation_result)
except Exception as rotation_error:
# Try to heal rotation execution error
healed_rotation = _EXCEPTION_HEALER.heal_rotation_error(
rotation_error, {
"log_path": str(log_path),
"log_type": log_type,
"operation": "file_rotation"
}
)
if healed_rotation["success"]:
# Try alternative rotation method
try:
# Simple rotation fallback: rename current log and
# create a fresh file with a minimal rotation header.
fallback_suffix = operation.get(
"suffix",
f"rotated-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
)
archive_path = log_path.with_suffix(
f".{fallback_suffix}{log_path.suffix}"
)
# Move current log to archive
await asyncio.to_thread(lambda: log_path.rename(archive_path))
# Write a simple rotation header into the new log file
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
project_name = project.get("name", "Unknown Project")
header = (
"# Progress Log\n\n"
"## Rotation Notice\n"
f"Previous log was archived to: {archive_path.name}\n\n"
f"Rotation Time: {timestamp}\n"
f"Project: {project_name}\n\n"
"---\n\n"
)
await asyncio.to_thread(lambda: log_path.write_text(header))
except Exception:
# If header write fails, fall back to an empty file.
await asyncio.to_thread(lambda: log_path.write_text(""))
fallback_result = {
"log_type": log_type,
"status": "rotated_fallback",
"dry_run": False,
"original_path": str(log_path),
"archive_path": str(archive_path),
"fallback_method": True,
"healed_error": True
}
execution_results.append(fallback_result)
successful_rotations.append(fallback_result)
except Exception:
# Fallback failed
error_result = {
"log_type": log_type,
"status": "failed",
"error": str(rotation_error),
"healing_attempted": True,
"healing_failed": True
}
failed_rotations.append(error_result)
execution_results.append(error_result)
else:
# Rotation failed completely
error_result = {
"log_type": log_type,
"status": "failed",
"error": str(rotation_error),
"healing_attempted": True
}
failed_rotations.append(error_result)
execution_results.append(error_result)
except Exception as execution_error:
# Handle execution-level errors
healed_execution = _EXCEPTION_HEALER.heal_document_operation_error(
execution_error, {"operation": "rotation_execution", "log_type": log_type}
)
if healed_execution["success"]:
# Create minimal success result
minimal_result = {
"log_type": log_type,
"status": "completed_with_fallback",
"dry_run": final_dry_run,
"healed_execution": True,
"original_error": str(execution_error)
}
execution_results.append(minimal_result)
successful_rotations.append(minimal_result)
else:
# Execution failed
error_result = {
"log_type": log_type,
"status": "failed",
"error": str(execution_error),
"execution_level_error": True
}
failed_rotations.append(error_result)
execution_results.append(error_result)
except Exception as operation_error:
# Handle operation-level errors
error_result = {
"log_type": operation.get("log_type", "unknown"),
"status": "failed",
"error": str(operation_error),
"operation_level_error": True
}
failed_rotations.append(error_result)
execution_results.append(error_result)
# Prepare final response
response = {
"ok": len(successful_rotations) > 0 or len(skipped_rotations) > 0,
"rotation_executed": not final_dry_run,
"dry_run": final_dry_run,
"dry_run_mode": rotation_prep.get("final_dry_run_mode"),
"processed_log_types": rotation_prep["validated_log_types"],
"results": execution_results,
"summary": {
"total_operations": len(rotation_operations),
"successful": len(successful_rotations),
"failed": len(failed_rotations),
"skipped": len(skipped_rotations)
}
}
# Backwards-compat: historically rotate_log returned a single-file shape with
# `archived_to` + a few top-level counters. When exactly one rotation
# produced an archive path, mirror those fields for callers/tests.
try:
primary = None
for item in execution_results:
if isinstance(item, dict) and (item.get("archive_path") or item.get("archived_to")):
primary = item
break
if primary is None:
for item in execution_results:
if isinstance(item, dict):
primary = item
break
if primary:
archived_to = primary.get("archived_to") or primary.get("archive_path")
if archived_to:
response.setdefault("archived_to", archived_to)
response.setdefault("archive_path", archived_to)
response.setdefault("rotation_id", primary.get("rotation_id"))
response.setdefault("estimated_entry_count", primary.get("estimated_entry_count"))
response.setdefault(
"entry_count",
primary.get("entry_count")
or primary.get("rotated_entry_count")
or primary.get("estimated_entry_count"),
)
response.setdefault("rotation_completed", primary.get("rotation_completed"))
response.setdefault("archive_hash", primary.get("archive_hash") or primary.get("archive_sha256") or primary.get("file_hash"))
response.setdefault("archive_sha256", primary.get("archive_sha256") or primary.get("archive_hash") or primary.get("file_hash"))
response.setdefault("integrity_verified", primary.get("integrity_verified"))
response.setdefault("audit_trail_stored", primary.get("audit_trail_stored"))
response.setdefault("state_updated", primary.get("state_updated"))
response.setdefault("rotation_duration_seconds", primary.get("rotation_duration_seconds"))
response.setdefault("sequence_number", primary.get("sequence_number"))
response.setdefault(
"entry_count_method",
primary.get("entry_count_method") or primary.get("estimated_entry_count_method") or "unknown",
)
mode = (response.get("dry_run_mode") or "estimate").lower()
if response.get("dry_run") is True:
response.setdefault("entry_count_approximate", mode != "precise")
response["entry_count_method"] = "full_count" if mode == "precise" else response.get("entry_count_method")
response.setdefault("file_hash", primary.get("file_hash"))
response.setdefault("sequence_number", 0)
except Exception:
pass
# Add warnings if any operations failed
if failed_rotations:
response["warnings"] = [f"Failed to rotate {r['log_type']}: {r.get('error', 'Unknown error')}" for r in failed_rotations]
return response
except Exception as e:
# Apply ultimate exception healing for rotation execution
healed_result = _EXCEPTION_HEALER.heal_emergency_exception(
e, {"operation": "rotation_execution", "project": project.get("name", "unknown")}
)
if healed_result and healed_result.get("success") and "healed_values" in healed_result:
# Create emergency rotation result
emergency_params = _FALLBACK_MANAGER.apply_emergency_fallback(
"rotate_log", healed_result["healed_values"]
)
return {
"ok": True,
"rotation_executed": False,
"dry_run": True,
"emergency_fallback": True,
"processed_log_types": ["progress"],
"results": [{
"log_type": "progress",
"status": "emergency_fallback",
"error": str(e),
"emergency_params": emergency_params
}],
"summary": {
"total_operations": 1,
"successful": 0,
"failed": 0,
"skipped": 1
},
"original_error": str(e)
}
else:
return {
"ok": False,
"error": f"Critical rotation error: {str(e)}",
"suggestion": "Check system configuration and try again",
"rotation_executed": False,
"dry_run": True,
"processed_log_types": [],
"results": [],
"summary": {
"total_operations": 0,
"successful": 0,
"failed": 0,
"skipped": 0
}
}
@app.tool()
async def rotate_log(
project: Optional[str] = None,
suffix: Optional[str] = None,
custom_metadata: Optional[str] = None,
confirm: Optional[bool] = None,
dry_run: Optional[bool] = None,
dry_run_mode: Optional[str] = None,
log_type: Optional[str] = None,
log_types: Optional[List[str]] = None,
rotate_all: Optional[bool] = None,
auto_threshold: Optional[bool] = None,
threshold_entries: Optional[int] = None,
config: Optional[RotateLogConfig] = None, # Configuration object for enhanced parameter handling
**_kwargs: Any, # tolerate unknown kwargs (contract: tools never TypeError)
) -> Dict[str, Any]:
"""
Rotate one or more project log files with integrity guarantees.
Args:
project: Optional project name override (defaults to active project context).
suffix: Optional suffix for archive filenames.
custom_metadata: Optional JSON metadata appended to rotation record.
confirm: When True, perform actual rotation (required unless auto-threshold triggers).
dry_run: If True, preview rotation without changing files. Defaults to safe preview when confirm=False.
dry_run_mode: Controls dry-run accuracy. \"estimate\" (default) returns approximate counts; \"precise\" forces a full line count.
log_type: Single log type to rotate (e.g., "progress", "doc_updates").
log_types: List of log types to rotate.
rotate_all: When True, rotate every configured log type for the project.
auto_threshold: When True, only rotate logs whose entry count exceeds a threshold.
threshold_entries: Optional override for entry threshold (defaults to definition or 500).
config: Optional RotateLogConfig object for enhanced parameter handling.
ENHANCED FEATURES:
- Dual parameter support: Use either legacy parameters OR RotateLogConfig object
- Configuration Mode: Use RotateLogConfig for structured parameter management
- Legacy Mode: Pass individual parameters as before (fully backward compatible)
- Legacy parameters take precedence over config object when both provided
Configuration Mode: Use RotateLogConfig for structured parameter management
Legacy Mode: Pass individual parameters as before (maintains backward compatibility)
"""
# Phase 3 Task 3.5: Enhanced Function Decomposition
# This function now uses decomposed sub-functions with bulletproof error handling
try:
state_snapshot = await server_module.state_manager.record_tool("rotate_log")
except Exception:
state_snapshot = {}
try:
# === PHASE 3 ENHANCED PARAMETER VALIDATION AND PREPARATION ===
# Replace monolithic parameter handling with bulletproof validation and healing
final_config, validation_info = _validate_rotation_parameters(
suffix=suffix,
custom_metadata=custom_metadata,
confirm=confirm,
dry_run=dry_run,
dry_run_mode=dry_run_mode,
log_type=log_type,
log_types=log_types,
rotate_all=rotate_all,
auto_threshold=auto_threshold,
threshold_entries=threshold_entries,
config=config
)
# === CONTEXT RESOLUTION WITH ENHANCED ERROR HANDLING ===
try:
context = await _ROTATE_HELPER.prepare_context(
tool_name="rotate_log",
agent_id=None,
explicit_project=project,
require_project=True,
state_snapshot=state_snapshot,
)
except ProjectResolutionError as exc:
# Apply Phase 2 ExceptionHealer for project resolution errors
healed_context = _EXCEPTION_HEALER.heal_parameter_validation_error(
exc, {"tool_name": "rotate_log", "operation": "project_resolution"}
)
if healed_context["success"]:
# Try with healed context
try:
context = await _ROTATE_HELPER.prepare_context(
tool_name="rotate_log",
agent_id=None,
require_project=True,
state_snapshot=state_snapshot,
)
except Exception:
# Fallback response
payload = _ROTATE_HELPER.translate_project_error(exc)
payload = apply_response_defaults(payload, {
"suggestion": "Invoke set_project before rotating logs"
})
return payload
else:
payload = _ROTATE_HELPER.translate_project_error(exc)
payload = apply_response_defaults(payload, {
"suggestion": "Invoke set_project before rotating logs"
})
return payload
project = context.project or {}
# === ENHANCED ROTATION OPERATION PREPARATION ===
rotation_prep = _prepare_rotation_operation(final_config, context, project)
if not rotation_prep.get("preparation_complete", False):
# If preparation failed, try to continue with emergency rotation
if rotation_prep.get("emergency_fallback"):
# Execute emergency rotation
rotation_result = _execute_rotation_with_fallbacks(
rotation_prep, final_config, context, project
)
rotation_result["parameter_healing"] = True
rotation_result["emergency_fallback"] = True
rotation_result["preparation_failed"] = True
return rotation_result
else:
# Return error if preparation completely failed
return {
"ok": False,
"error": "Failed to prepare rotation operation",
"details": rotation_prep.get("error", "Unknown preparation error"),
"suggestion": "Try with simpler rotation parameters"
}
# === ENHANCED ROTATION EXECUTION WITH FALLBACKS ===
rotation_result = await _execute_rotation_with_fallbacks(
rotation_prep, final_config, context, project
)
# Add validation info to result if healing was applied
if validation_info.get("healing_applied"):
rotation_result["parameter_healing"] = True
if validation_info.get("exception_healing"):
rotation_result["parameter_exception_healing"] = True
elif validation_info.get("emergency_fallback"):
rotation_result["parameter_emergency_fallback"] = True
else:
rotation_result["parameter_healing_applied"] = True
# Add preparation warnings if any
if rotation_prep.get("emergency_fallback"):
if "warnings" not in rotation_result:
rotation_result["warnings"] = []
rotation_result["warnings"].append("Emergency fallback applied during preparation")
return rotation_result
except Exception as e:
# === ULTIMATE EXCEPTION HANDLING AND FALLBACK ===
# Apply Phase 2 ExceptionHealer for unexpected errors
healed_result = _EXCEPTION_HEALER.heal_emergency_exception(
e, {
"operation": "rotate_log_main",
"project": project,
"tool": "rotate_log"
}
)
if healed_result and healed_result.get("success") and "healed_values" in healed_result:
# Create emergency rotation with healed parameters
emergency_params = _FALLBACK_MANAGER.apply_emergency_fallback(
"rotate_log", healed_result["healed_values"]
)
return {
"ok": True,
"rotation_executed": False,
"dry_run": True,
"emergency_fallback": True,
"processed_log_types": ["progress"],
"results": [{
"log_type": "progress",
"status": "emergency_fallback",
"error": str(e),
"emergency_params": emergency_params
}],
"summary": {
"total_operations": 1,
"successful": 0,
"failed": 0,
"skipped": 1
},
"original_error": str(e)
}
else:
return {
"ok": False,
"error": f"Critical error in rotate_log: {str(e)}",
"emergency_healing_failed": True,
"suggestion": "Check system configuration and try again",
"rotation_executed": False,
"dry_run": True,
"processed_log_types": [],
"results": []
}
return sanitized[:64] or "log"
def _build_archive_suffix(suffix: Optional[str], log_type: str, rotation_id: str) -> str:
safe_rotation_id = rotation_id.replace("-", "")[:8]
base = _sanitize_suffix(suffix) if suffix else f"{log_type}_archive"
return f"{base}_{safe_rotation_id}"
async def _build_template_content(
log_type: str,
project: Dict[str, Any],
rotation_context: Dict[str, Any],
) -> str:
if log_type != "progress":
timestamp = rotation_context.get("rotation_timestamp_utc", "Unknown time")
rotation_id = rotation_context.get("rotation_id", "unknown")
project_name = project.get("name", "Unknown Project")
return (
f"# {log_type.replace('_', ' ').title()} Log\n\n"
f"Log rotated on {timestamp} (rotation id {rotation_id}) for project {project_name}.\n\n"
)
template_context = substitution_context(
project_name=project["name"],
author=project.get("defaults", {}).get("agent", "Scribe"),
rotation_context=rotation_context,
)
template_engine = None
try:
project_root = Path(project.get("root", "")) if project.get("root") else Path.cwd()
template_engine = Jinja2TemplateEngine(
project_root=project_root,
project_name=project["name"],
security_mode="sandbox",
)
except Exception as engine_error: # pragma: no cover - defensive path
print(f"Warning: Failed to initialize Jinja2 template engine for rotation: {engine_error}")
template_name = f"documents/{TEMPLATE_FILENAMES['progress_log']}"
if template_engine:
try:
rendered = template_engine.render_template(template_name, metadata=template_context)
if rendered:
return rendered
except TemplateEngineError as render_error:
print(f"Warning: Jinja2 rendering failed for {template_name}: {render_error}")
from scribe_mcp.tools.generate_doc_templates import _render_template
templates = await load_templates()
template_body = templates.get("progress_log", "")
try:
rendered = _render_template(template_body, template_context)
if rendered:
return rendered
except Exception as template_error: # pragma: no cover - defensive
print(f"Warning: Template generation failed: {template_error}")
rotation_id = rotation_context.get("rotation_id", "unknown")
timestamp = rotation_context.get("rotation_timestamp_utc", "Unknown")
project_name = project.get("name", "Unknown Project")
author = project.get("defaults", {}).get("agent", "Scribe")
return (
"# Progress Log\n\n"
"## Rotation Notice\n"
f"Previous log was archived with rotation ID: {rotation_id}\n\n"
f"Created: {timestamp}\n"
f"Project: {project_name}\n"
f"Author: {author}\n\n"
"---\n\n"
)
async def _rotate_single_log(
*,
project: Dict[str, Any],
context: LoggingContext,
state_manager,
audit_manager,
log_type: str,
log_path: Path,
definition: Dict[str, Any],
suffix: Optional[str],
parsed_metadata: Optional[Dict[str, Any]],
confirm: Optional[bool],
dry_run: Optional[bool],
dry_run_mode: Optional[str],
auto_threshold: bool,
threshold_entries: Optional[int],
) -> Dict[str, Any]:
result: Dict[str, Any] = {"log_type": log_type}
if not log_path.exists():
result.update({
"ok": False,
"error": f"Log file not found: {log_path}",
"suggestion": "Create initial log entries before rotating",
})
return result
normalized_mode = (dry_run_mode or "estimate").lower()
if normalized_mode not in {"estimate", "precise"}:
result.update({
"ok": False,
"error": f"Invalid dry_run_mode '{dry_run_mode}'. Use 'estimate' or 'precise'.",
"suggestion": "Set dry_run_mode to 'estimate' (default) or 'precise' for exact counts.",
})
return result
try:
snapshot = _snapshot_file_state(log_path)
except OSError as snapshot_error:
result.update({
"ok": False,
"error": f"Unable to inspect log file: {snapshot_error}",
"suggestion": "Verify file permissions and that the log path is readable.",
})
return result
cached_stats = state_manager.get_log_stats(project["name"], log_type)
cached_ema = cached_stats.get("ema_bytes_per_line") if cached_stats else None
cached_initialized = cached_stats.get("initialized") if cached_stats else False
entry_estimate = _estimate_entry_count(snapshot, cached_stats)
threshold_limit = _rotation_threshold_for_definition(definition, threshold_entries)
if auto_threshold and threshold_limit is None:
threshold_limit = DEFAULT_AUTO_THRESHOLD_ENTRIES
estimation_band = _compute_estimation_band(threshold_limit)
estimation_decision: Optional[str] = None
if auto_threshold and threshold_limit:
estimation_decision = _classify_estimate(entry_estimate.count, threshold_limit, estimation_band)
if estimation_decision == "undecided":
refined_estimate = _refine_entry_estimate(log_path, snapshot, entry_estimate)
if refined_estimate:
entry_estimate = refined_estimate
if not refined_estimate.approximate:
observed_bpl = _compute_bytes_per_line(snapshot["size_bytes"], refined_estimate.count)
state_manager.update_log_stats(
project["name"],
log_type,
size_bytes=snapshot["size_bytes"],
line_count=refined_estimate.count,
ema_bytes_per_line=observed_bpl,
mtime_ns=snapshot.get("mtime_ns"),
inode=snapshot.get("inode"),
source="tail_sample",
initialized=True,
)
estimation_decision = _classify_estimate(entry_estimate.count, threshold_limit, estimation_band)
if estimation_decision == "undecided" and normalized_mode == "precise":
precise_count = count_file_lines(str(log_path))
observed_bpl = _compute_bytes_per_line(snapshot["size_bytes"], precise_count)
state_manager.update_log_stats(
project["name"],
log_type,
size_bytes=snapshot["size_bytes"],
line_count=precise_count,
ema_bytes_per_line=observed_bpl,
mtime_ns=snapshot.get("mtime_ns"),
inode=snapshot.get("inode"),
source="precise_dry_run",
initialized=True,
)
entry_estimate = EntryCountEstimate(
count=precise_count,
approximate=False,
method="full_count",
details={**snapshot, "bytes_per_line": observed_bpl},
)
estimation_decision = _classify_estimate(entry_estimate.count, threshold_limit, estimation_band)
if auto_threshold and threshold_limit and estimation_decision == "below":
result.update({
"ok": True,
"rotation_skipped": True,
"reason": "threshold_not_reached",
"entry_count": entry_estimate.count,
"entry_count_approximate": entry_estimate.approximate,
"entry_count_method": entry_estimate.method,
"threshold_entries": threshold_limit,
"estimation_band": estimation_band,
"estimation_details": dict(entry_estimate.details),
"current_log_path": str(log_path),
"auto_threshold_triggered": False,
"estimation_decision": "below",
})
return result
auto_triggered = bool(auto_threshold and threshold_limit and estimation_decision == "above")
confirm_requested = bool(confirm)
should_rotate = confirm_requested or auto_triggered
dry_run_flag = dry_run if dry_run is not None else not should_rotate
if dry_run_flag and normalized_mode == "precise" and entry_estimate.approximate:
precise_count = count_file_lines(str(log_path))
observed_bpl = _compute_bytes_per_line(snapshot["size_bytes"], precise_count)
state_manager.update_log_stats(
project["name"],
log_type,
size_bytes=snapshot["size_bytes"],
line_count=precise_count,
ema_bytes_per_line=observed_bpl,
mtime_ns=snapshot.get("mtime_ns"),
inode=snapshot.get("inode"),
source="precise_dry_run",
initialized=True,
)
entry_estimate = EntryCountEstimate(
count=precise_count,
approximate=False,
method="full_count",
details={**snapshot, "bytes_per_line": observed_bpl},
)
sequence_number = get_next_sequence_number(project["name"])
hash_chain_info = state_manager.get_hash_chain_info(project["name"])
previous_hash = hash_chain_info.get("last_hash")
rotation_id = generate_rotation_id(project["name"])
rotation_timestamp = format_utc()
if dry_run_flag:
archive_suffix = _build_archive_suffix(suffix, log_type, rotation_id)
potential_archive = await rotate_file(
log_path,
archive_suffix,
confirm=should_rotate,
dry_run=True,
repo_root=Path(project.get("root") or settings.project_root).resolve(),
)
current_size_mb = round(snapshot["size_bytes"] / (1024 * 1024), 3) if snapshot["size_bytes"] else 0.0
estimation_decision_label = estimation_decision or ("rotate" if should_rotate else "manual")
result.update({
"ok": True,
"dry_run": True,
"rotation_id": rotation_id,
"rotation_timestamp_utc": rotation_timestamp,
"rotation_timestamp": rotation_timestamp,
"project": project["name"],
"log_type": log_type,
"current_file_path": str(log_path),
"current_log_path": str(log_path),
"current_file_size_bytes": snapshot["size_bytes"],
"current_file_size_mb": current_size_mb,
"current_file_line_count": entry_estimate.count,
"current_file_sha256": cached_stats.get("sha256") if cached_stats else None,
"current_file_hash": cached_stats.get("sha256") if cached_stats else None,
"file_hash": cached_stats.get("sha256") if cached_stats else None,
"entry_count": entry_estimate.count,
"entry_count_approximate": entry_estimate.approximate,
"entry_count_method": entry_estimate.method,
"estimation_band": estimation_band,
"estimation_details": dict(entry_estimate.details),
"estimation_decision": estimation_decision_label,
"sequence_number": sequence_number,
"hash_chain_previous": previous_hash,
"hash_chain_root": hash_chain_info.get("root_hash"),
"hash_chain_sequence": sequence_number,
"archived_to": str(potential_archive),
"requires_confirmation": not should_rotate,
"auto_threshold_triggered": auto_triggered,
})
if threshold_limit:
result["threshold_entries"] = threshold_limit
return result
if not should_rotate:
result.update({
"ok": False,
"error": "Rotation requires explicit confirmation. Add confirm=true to proceed.",
"suggestion": "Use confirm=true to perform rotation, or dry_run=true to preview changes",
})
return result
rotation_start_time = datetime.utcnow()
archive_suffix = _build_archive_suffix(suffix, log_type, rotation_id)
archive_path = await rotate_file(
log_path,
archive_suffix,
confirm=True,
dry_run=False,
template_content=None,
repo_root=Path(project.get("root") or settings.project_root).resolve(),
)
archive_info = verify_file_integrity(
archive_path,
repo_root=Path(project.get("root") or settings.project_root).resolve(),
)
archive_hash = archive_info.get("sha256")
archive_size = archive_info.get("size_bytes")
rotated_entries = archive_info.get("line_count")
entry_count_method = "archive_scan"
entry_count_approximate = False
if rotated_entries is None:
rotated_entries = entry_estimate.count
entry_count_method = entry_estimate.method
entry_count_approximate = entry_estimate.approximate
rotation_context = create_rotation_context(
rotation_id=rotation_id,
rotation_timestamp=rotation_timestamp,
previous_log_path=str(archive_path),
previous_log_hash=archive_hash or "",
previous_log_entries=str(rotated_entries),
current_sequence=str(sequence_number),
total_rotations=str(sequence_number),
hash_chain_previous=previous_hash or "",
hash_chain_sequence=str(sequence_number),
hash_chain_root=hash_chain_info.get("root_hash") or "",
)
rendered_template = await _build_template_content(log_type, project, rotation_context)
try: # Best-effort WAL entry
from scribe_mcp.utils.files import WriteAheadLog
wal = WriteAheadLog(archive_path)
rotation_journal_entry = {
"op": "rotate",
"from": str(log_path),
"to": str(archive_path),
"rotation_id": rotation_id,
"timestamp": rotation_timestamp,
"sequence": str(sequence_number),
"entries_rotated": str(rotated_entries),
"log_type": log_type,
}
wal.write_entry(rotation_journal_entry)
except Exception as wal_error: # pragma: no cover - defensive
print(f"Warning: Failed to write rotation journal entry: {wal_error}")
rotation_metadata = create_rotation_metadata(
archived_file_path=str(archive_path),
rotation_uuid=rotation_id,
rotation_timestamp=rotation_timestamp,
sequence_number=sequence_number,
previous_hash=previous_hash,
log_type=log_type,
)
if rotated_entries is not None:
rotation_metadata["entry_count"] = rotated_entries
if parsed_metadata:
rotation_metadata.update(parsed_metadata)
audit_success = store_rotation_metadata(project["name"], rotation_metadata)
state_success = update_project_state(project["name"], rotation_metadata)
rotation_duration = max(
0.0, (datetime.utcnow() - rotation_start_time).total_seconds()
)
if rendered_template:
await _write_rotated_log_header(log_path, rendered_template)
observed_bpl = None if entry_count_approximate else _compute_bytes_per_line(archive_size, rotated_entries)
try:
new_snapshot = _snapshot_file_state(log_path)
except OSError:
new_snapshot = {"size_bytes": 0, "mtime_ns": None, "inode": None}
new_log_line_count = rendered_template.count("\n") if rendered_template else 0
ema_value = cached_ema
ema_source = "post_rotation"
initialized_flag = cached_initialized
if not entry_count_approximate:
observed_bpl = _compute_bytes_per_line(archive_size, rotated_entries)
if observed_bpl:
ema_value = _blend_ema(cached_ema, observed_bpl, EMA_SMOOTHING_ALPHA)
ema_source = "post_rotation_precise"
initialized_flag = True
else:
approx_bpl = _compute_bytes_per_line(archive_size, entry_estimate.count)
if approx_bpl:
ema_value = _blend_ema(cached_ema, approx_bpl, EMA_SMOOTHING_ALPHA / 2)
ema_source = "post_rotation_estimate"
state_manager.update_log_stats(
project["name"],
log_type,
size_bytes=new_snapshot.get("size_bytes"),
line_count=new_log_line_count,
ema_bytes_per_line=ema_value,
mtime_ns=new_snapshot.get("mtime_ns"),
inode=new_snapshot.get("inode"),
source=ema_source,
initialized=initialized_flag,
)
result.update({
"ok": True,
"rotation_completed": True,
"rotation_id": rotation_id,
"rotation_timestamp_utc": rotation_timestamp,
"rotation_timestamp": rotation_timestamp,
"project": project["name"],
"log_type": log_type,
"sequence_number": sequence_number,
"current_log_path": str(log_path),
"archive_path": str(archive_path),
"archived_to": str(archive_path),
"archive_sha256": archive_hash,
"archive_hash": archive_hash,
"archive_size_bytes": archive_size,
"archive_size_mb": archive_info.get("size_mb"),
"rotated_entry_count": rotated_entries,
"entry_count": rotated_entries,
"entry_count_approximate": entry_count_approximate,
"entry_count_method": entry_count_method,
"estimated_entry_count": entry_estimate.count,
"estimated_entry_count_method": entry_estimate.method,
"estimated_entry_count_approximate": entry_estimate.approximate,
"estimation_band": estimation_band,
"estimation_details": dict(entry_estimate.details),
"estimation_decision": estimation_decision or "rotate",
"rotation_duration_seconds": rotation_duration,
"hash_chain_previous": previous_hash,
"hash_chain_root": hash_chain_info.get("root_hash"),
"hash_chain_sequence": sequence_number,
"audit_trail_stored": audit_success,
"state_updated": state_success,
"auto_threshold_triggered": auto_triggered,
"template_generated": bool(rendered_template),
"integrity_verified": archive_hash is not None,
})
if threshold_limit:
result["threshold_entries"] = threshold_limit
if parsed_metadata:
result["custom_metadata_applied"] = True
result["custom_metadata"] = parsed_metadata
return result
def _merge_single_rotation_response(summary: Dict[str, Any], rotation_result: Dict[str, Any]) -> Dict[str, Any]:
"""Merge rotation result into summary using ConfigManager utilities."""
merged = build_response_payload(summary, **rotation_result)
merged.setdefault("rotations", summary.get("rotations", [rotation_result]))
return merged
def _snapshot_file_state(path: Path) -> Dict[str, Any]:
stat_result = path.stat()
inode = getattr(stat_result, "st_ino", None)
if not inode:
inode = None
return {
"size_bytes": stat_result.st_size,
"mtime_ns": getattr(stat_result, "st_mtime_ns", int(stat_result.st_mtime * 1_000_000_000)),
"inode": inode,
}
def _estimate_entry_count(snapshot: Dict[str, Any], cached_stats: Optional[Dict[str, Any]]) -> EntryCountEstimate:
"""Estimate entry count using FileSizeEstimator."""
size_bytes = snapshot.get("size_bytes", 0) or 0
mtime_ns = snapshot.get("mtime_ns")
# Add cached_initialized to details if available
if cached_stats and cached_stats.get("initialized"):
# We'll modify the details after estimation
pass
estimate = _FILE_SIZE_ESTIMATOR.estimate_entry_count_with_cache(
size_bytes, cached_stats, mtime_ns
)
# Add cached_initialized flag if it was in the original cached stats
if cached_stats and cached_stats.get("initialized") and "cached_initialized" not in estimate.details:
estimate.details["cached_initialized"] = cached_stats.get("initialized")
return estimate
def _refine_entry_estimate(log_path: Path, snapshot: Dict[str, Any], estimate: EntryCountEstimate) -> Optional[EntryCountEstimate]:
"""Refine entry count estimate using FileSizeEstimator."""
size_bytes = snapshot.get("size_bytes", 0)
return _FILE_SIZE_ESTIMATOR.refine_estimate_with_sampling(log_path, size_bytes, estimate)
def _compute_estimation_band(threshold: Optional[int]) -> Optional[int]:
"""Compute estimation band using ThresholdEstimator."""
return _THRESHOLD_ESTIMATOR.compute_estimation_band(threshold)
def _classify_estimate(value: int, threshold: int, band: Optional[int]) -> str:
"""Classify estimate using ThresholdEstimator with compatible return values."""
classification = _THRESHOLD_ESTIMATOR.classify_estimate(value, threshold, band)
# Map to original return values for backward compatibility
mapping = {
"well_below_threshold": "below",
"well_above_threshold": "above",
"near_threshold": "undecided",
"below_threshold": "below",
"above_threshold": "above"
}
return mapping.get(classification, "undecided")
def _compute_bytes_per_line(size_bytes: Optional[int], line_count: Optional[int]) -> Optional[float]:
"""Compute bytes per line using FileSizeEstimator."""
return _FILE_SIZE_ESTIMATOR.compute_bytes_per_line(size_bytes, line_count)
def _clamp_bytes_per_line(value: float) -> float:
"""Clamp bytes-per-line value within reasonable bounds."""
from scribe_mcp.utils.estimator import FileSizeEstimator
estimator = FileSizeEstimator()
return estimator.clamp_bytes_per_line(value)
def _blend_ema(current: Optional[float], observed: Optional[float], smoothing: float) -> Optional[float]:
if observed is None:
return current
observed = _clamp_bytes_per_line(observed)
smoothing = max(0.0, min(1.0, smoothing))
if current is None:
return observed
current = _clamp_bytes_per_line(current)
blended = (1.0 - smoothing) * current + smoothing * observed
return _clamp_bytes_per_line(blended)