"""Advanced querying for progress log entries."""
from __future__ import annotations
import re
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from scribe_mcp.utils.time import format_utc, utcnow
from scribe_mcp import server as server_module
from scribe_mcp.server import app
from scribe_mcp.tools.constants import STATUS_EMOJI
from scribe_mcp.tools.project_utils import load_project_config
from scribe_mcp.utils.config_manager import ConfigManager, validate_enum_value, validate_range, BulletproofFallbackManager
from scribe_mcp.utils.logs import parse_log_line, read_all_lines
from scribe_mcp.utils.search import message_matches
from scribe_mcp.utils.time import coerce_range_boundary
from scribe_mcp.utils.response import create_pagination_info, default_formatter
from scribe_mcp.utils.tokens import token_estimator
from scribe_mcp.utils.estimator import PaginationCalculator
from scribe_mcp.utils.bulk_processor import BulkProcessor
from scribe_mcp.utils.error_handler import ErrorHandler, ExceptionHealer
from scribe_mcp.utils.parameter_validator import ToolValidator, BulletproofParameterCorrector
from scribe_mcp.tools.config.query_entries_config import QueryEntriesConfig
from scribe_mcp.shared.logging_utils import (
LoggingContext,
ProjectResolutionError,
clean_list as shared_clean_list,
normalize_meta_filters,
resolve_logging_context,
)
from scribe_mcp.shared.base_logging_tool import LoggingToolMixin
VALID_MESSAGE_MODES = {"substring", "regex", "exact"}
VALID_SEARCH_SCOPES = {"project", "global", "all_projects", "research", "bugs", "all", "sentinel"}
VALID_DOCUMENT_TYPES = {"progress", "research", "architecture", "bugs", "global", "sentinel_log"}
# Global configuration manager for parameter handling
_CONFIG_MANAGER = ConfigManager("query_entries")
# Global pagination calculator
_PAGINATION_CALCULATOR = PaginationCalculator()
# Phase 3 Enhanced utilities integration
_PARAMETER_CORRECTOR = BulletproofParameterCorrector()
_EXCEPTION_HEALER = ExceptionHealer()
_FALLBACK_MANAGER = BulletproofFallbackManager()
class _QueryEntriesHelper(LoggingToolMixin):
def __init__(self) -> None:
self.server_module = server_module
_HELPER = _QueryEntriesHelper()
def _validate_search_parameters(
project: Optional[str],
start: Optional[str],
end: Optional[str],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
emoji: Optional[List[str]],
status: Optional[List[str]],
agents: Optional[List[str]],
meta_filters: Optional[Dict[str, Any]],
limit: int,
page: int,
page_size: int,
compact: bool,
fields: Optional[List[str]],
include_metadata: bool,
search_scope: str,
document_types: Optional[List[str]],
include_outdated: bool,
verify_code_references: bool,
time_range: Optional[str],
relevance_threshold: float,
max_results: Optional[int],
config: Optional[QueryEntriesConfig],
priority: Optional[List[str]] = None,
category: Optional[List[str]] = None,
min_confidence: Optional[float] = None,
priority_sort: bool = False,
) -> Tuple[QueryEntriesConfig, Dict[str, Any]]:
"""
Validate and prepare search parameters using enhanced Phase 3 utilities.
This function replaces the monolithic parameter handling section of query_entries
with bulletproof parameter validation and healing.
"""
try:
# Apply Phase 1 BulletproofParameterCorrector for initial parameter healing
healed_params = {}
healing_applied = False
# Heal enum parameters
if message_mode:
healed_message_mode = _PARAMETER_CORRECTOR.correct_enum_parameter(
message_mode, VALID_MESSAGE_MODES, field_name="message_mode"
)
if healed_message_mode != message_mode:
healed_params["message_mode"] = healed_message_mode
healing_applied = True
if search_scope:
healed_search_scope = _PARAMETER_CORRECTOR.correct_enum_parameter(
search_scope, VALID_SEARCH_SCOPES, field_name="search_scope"
)
if healed_search_scope != search_scope:
healed_params["search_scope"] = healed_search_scope
healing_applied = True
# Heal array parameters
if emoji:
healed_emoji = _PARAMETER_CORRECTOR.correct_list_parameter(emoji)
if healed_emoji != emoji:
healed_params["emoji"] = healed_emoji
healing_applied = True
if status:
healed_status = _PARAMETER_CORRECTOR.correct_list_parameter(status)
if healed_status != status:
healed_params["status"] = healed_status
healing_applied = True
if agents:
healed_agents = _PARAMETER_CORRECTOR.correct_list_parameter(agents)
if healed_agents != agents:
healed_params["agents"] = healed_agents
healing_applied = True
if fields:
healed_fields = _PARAMETER_CORRECTOR.correct_list_parameter(fields)
if healed_fields != fields:
healed_params["fields"] = healed_fields
healing_applied = True
if document_types:
healed_document_types = _PARAMETER_CORRECTOR.correct_list_parameter(document_types)
if healed_document_types != document_types:
healed_params["document_types"] = healed_document_types
healing_applied = True
# Heal range parameters
if limit is not None:
healed_limit = _PARAMETER_CORRECTOR.correct_numeric_parameter(
limit, min_val=1, max_val=1000, field_name="limit"
)
if healed_limit != limit:
healed_params["limit"] = healed_limit
healing_applied = True
healed_page = _PARAMETER_CORRECTOR.correct_numeric_parameter(
page, min_val=1, max_val=10000, field_name="page"
)
if healed_page != page:
healed_params["page"] = healed_page
healing_applied = True
healed_page_size = _PARAMETER_CORRECTOR.correct_numeric_parameter(
page_size, min_val=1, max_val=1000, field_name="page_size"
)
if healed_page_size != page_size:
healed_params["page_size"] = healed_page_size
healing_applied = True
healed_relevance_threshold = _PARAMETER_CORRECTOR.correct_numeric_parameter(
relevance_threshold, min_val=0.0, max_val=1.0, field_name="relevance_threshold"
)
if healed_relevance_threshold != relevance_threshold:
healed_params["relevance_threshold"] = healed_relevance_threshold
healing_applied = True
# Heal string parameters
if project:
healed_project = _PARAMETER_CORRECTOR.correct_message_parameter(project)
if healed_project != project:
healed_params["project"] = healed_project
healing_applied = True
if message:
healed_message = _PARAMETER_CORRECTOR.correct_message_parameter(message)
if healed_message != message:
healed_params["message"] = healed_message
healing_applied = True
if start:
healed_start = _PARAMETER_CORRECTOR.correct_message_parameter(start)
if healed_start != start:
healed_params["start"] = healed_start
healing_applied = True
if end:
healed_end = _PARAMETER_CORRECTOR.correct_message_parameter(end)
if healed_end != end:
healed_params["end"] = healed_end
healing_applied = True
if time_range:
healed_time_range = _PARAMETER_CORRECTOR.correct_message_parameter(time_range)
if healed_time_range != time_range:
healed_params["time_range"] = healed_time_range
healing_applied = True
# Update parameters with healed values
final_project = healed_params.get("project", project)
final_start = healed_params.get("start", start)
final_end = healed_params.get("end", end)
final_message = healed_params.get("message", message)
final_message_mode = healed_params.get("message_mode", message_mode)
final_case_sensitive = case_sensitive # Boolean parameter
final_emoji = healed_params.get("emoji", emoji)
final_status = healed_params.get("status", status)
final_agents = healed_params.get("agents", agents)
final_meta_filters = meta_filters
final_limit = healed_params.get("limit", limit)
final_page = healed_params.get("page", page)
final_page_size = healed_params.get("page_size", page_size)
final_compact = compact # Boolean parameter
final_fields = healed_params.get("fields", fields)
final_include_metadata = include_metadata # Boolean parameter
final_search_scope = healed_params.get("search_scope", search_scope)
final_document_types = healed_params.get("document_types", document_types)
final_include_outdated = include_outdated # Boolean parameter
final_verify_code_references = verify_code_references # Boolean parameter
final_time_range = healed_params.get("time_range", time_range)
final_relevance_threshold = healed_params.get("relevance_threshold", relevance_threshold)
final_max_results = healed_params.get("max_results", max_results)
# Create configuration using dual parameter support
if config is not None:
# Create configuration from legacy parameters
legacy_config = QueryEntriesConfig.from_legacy_params(
project=final_project,
start=final_start,
end=final_end,
message=final_message,
message_mode=final_message_mode,
case_sensitive=final_case_sensitive,
emoji=final_emoji,
status=final_status,
agents=final_agents,
meta_filters=final_meta_filters,
limit=final_limit,
page=final_page,
page_size=final_page_size,
compact=final_compact,
fields=final_fields,
include_metadata=final_include_metadata,
search_scope=final_search_scope,
document_types=final_document_types,
include_outdated=final_include_outdated,
verify_code_references=final_verify_code_references,
time_range=final_time_range,
relevance_threshold=final_relevance_threshold,
max_results=final_max_results
)
# Merge with provided config (legacy parameters take precedence)
config_dict = config.to_dict()
legacy_dict = legacy_config.to_dict()
for key, value in legacy_dict.items():
if value is not None:
config_dict[key] = value
final_config = QueryEntriesConfig(**config_dict)
else:
final_config = QueryEntriesConfig.from_legacy_params(
project=final_project,
start=final_start,
end=final_end,
message=final_message,
message_mode=final_message_mode,
case_sensitive=final_case_sensitive,
emoji=final_emoji,
status=final_status,
agents=final_agents,
meta_filters=final_meta_filters,
limit=final_limit,
page=final_page,
page_size=final_page_size,
compact=final_compact,
fields=final_fields,
include_metadata=final_include_metadata,
search_scope=final_search_scope,
document_types=final_document_types,
include_outdated=final_include_outdated,
verify_code_references=final_verify_code_references,
time_range=final_time_range,
relevance_threshold=final_relevance_threshold,
max_results=final_max_results
)
return final_config, {"healing_applied": healing_applied, "healed_params": healed_params}
except Exception as e:
# Apply Phase 2 ExceptionHealer for parameter validation errors
healed_exception = _EXCEPTION_HEALER.heal_parameter_validation_error(
e,
{
"operation_type": "query_entries",
"parameters": {
"project": project,
"start": start,
"end": end,
"message": message,
"message_mode": message_mode,
"search_scope": search_scope,
"limit": limit,
"page": page,
"page_size": page_size,
"fields": fields,
"emoji": emoji,
"status": status,
"agents": agents,
"document_types": document_types,
},
},
)
if healed_exception.get("success"):
healed_values = healed_exception.get("healed_values", {})
# Create safe fallback configuration
safe_config = QueryEntriesConfig.from_legacy_params(
project=healed_values.get("project", project),
start=healed_values.get("start", start),
end=healed_values.get("end", end),
message=healed_values.get("message", message),
message_mode=healed_values.get("message_mode", message_mode),
case_sensitive=case_sensitive,
emoji=healed_values.get("emoji", emoji),
status=healed_values.get("status", status),
agents=healed_values.get("agents", agents),
meta_filters=meta_filters,
limit=healed_values.get("limit", limit),
page=healed_values.get("page", page),
page_size=healed_values.get("page_size", page_size),
compact=compact,
fields=healed_values.get("fields", fields),
include_metadata=include_metadata,
search_scope=healed_values.get("search_scope", search_scope),
document_types=healed_values.get("document_types", document_types),
include_outdated=include_outdated,
verify_code_references=verify_code_references,
time_range=healed_values.get("time_range", time_range),
relevance_threshold=healed_values.get("relevance_threshold", relevance_threshold),
max_results=healed_values.get("max_results", max_results)
)
return safe_config, {
"healing_applied": True,
"exception_healing": True,
"healed_params": healed_values,
"fallback_used": True
}
else:
# Ultimate fallback - use BulletproofFallbackManager
fallback_params = _FALLBACK_MANAGER.apply_emergency_fallback("query_entries", {
"project": project or "default",
"message": message,
"search_scope": search_scope,
"limit": limit
})
emergency_config = QueryEntriesConfig.from_legacy_params(
project=fallback_params.get("project", "default"),
start=start,
end=end,
message=fallback_params.get("message", "Query executed after error recovery"),
message_mode=fallback_params.get("message_mode", "substring"),
case_sensitive=case_sensitive,
emoji=fallback_params.get("emoji", emoji),
status=fallback_params.get("status", status),
agents=fallback_params.get("agents", agents),
meta_filters=meta_filters,
limit=fallback_params.get("limit", 50),
page=fallback_params.get("page", 1),
page_size=fallback_params.get("page_size", 50),
compact=compact,
fields=fallback_params.get("fields", fields),
include_metadata=include_metadata,
search_scope=fallback_params.get("search_scope", "project"),
document_types=fallback_params.get("document_types", document_types),
include_outdated=include_outdated,
verify_code_references=verify_code_references,
time_range=fallback_params.get("time_range", time_range),
relevance_threshold=fallback_params.get("relevance_threshold", 0.0),
max_results=fallback_params.get("max_results", max_results)
)
return emergency_config, {
"healing_applied": True,
"emergency_fallback": True,
"fallback_params": fallback_params
}
def _build_search_query(
final_config: QueryEntriesConfig,
context,
project: Dict[str, Any]
) -> Dict[str, Any]:
"""
Build and prepare search query with enhanced error handling.
This function extracts the search query building logic from the monolithic
query_entries function and adds bulletproof error handling.
"""
try:
# Extract parameters from config
project_name = final_config.project
start = final_config.start
end = final_config.end
message = final_config.message
message_mode = final_config.message_mode
case_sensitive = final_config.case_sensitive
emoji = final_config.emoji
status = final_config.status
agents = final_config.agents
meta_filters = final_config.meta_filters
limit = final_config.limit
page = final_config.page
page_size = final_config.page_size
search_scope = final_config.search_scope
document_types = final_config.document_types
include_outdated = final_config.include_outdated
verify_code_references = final_config.verify_code_references
time_range = final_config.time_range
relevance_threshold = final_config.relevance_threshold
max_results = final_config.max_results
# Resolve project context using the already prepared context from the caller
project_context = context
resolved_project = None
if isinstance(project, dict):
resolved_project = project.get("name") or project.get("project") or project.get("id")
if not resolved_project and getattr(context, "project", None):
resolved_project = (context.project or {}).get("name")
if not resolved_project:
resolved_project = project_name or "default"
# Build search parameters dictionary
search_params = {
"project": resolved_project,
"start": start,
"end": end,
"message": message,
"message_mode": message_mode,
"case_sensitive": case_sensitive,
"emoji": emoji,
"status": status,
"agents": agents,
"meta_filters": meta_filters,
"limit": limit,
"page": page,
"page_size": page_size,
"search_scope": search_scope,
"document_types": document_types,
"include_outdated": include_outdated,
"verify_code_references": verify_code_references,
"time_range": time_range,
"relevance_threshold": relevance_threshold,
"max_results": max_results,
"priority": final_config.priority if hasattr(final_config, 'priority') else None,
"category": final_config.category if hasattr(final_config, 'category') else None,
"min_confidence": final_config.min_confidence if hasattr(final_config, 'min_confidence') else None,
"priority_sort": final_config.priority_sort if hasattr(final_config, 'priority_sort') else False,
}
# Validate search parameters
validation_errors = []
# Validate time range parameters
if start and end:
try:
start_dt = datetime.fromisoformat(start.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end.replace('Z', '+00:00'))
if start_dt > end_dt:
validation_errors.append("Start time must be before end time")
except ValueError as time_error:
healed_time = _EXCEPTION_HEALER.heal_parameter_validation_error(
time_error, {"start": start, "end": end, "error_type": "time_parsing"}
)
if not healed_time or not healed_time.get("success"):
validation_errors.append(f"Invalid time format: {str(time_error)}")
# Validate pagination parameters
if page < 1:
healed_page = _PARAMETER_CORRECTOR.correct_numeric_parameter(
page, min_val=1, max_val=10000, field_name="page"
)
search_params["page"] = healed_page
if healed_page != page:
validation_errors.append(f"Page number corrected from {page} to {healed_page}")
if page_size < 1 or page_size > 1000:
healed_page_size = _PARAMETER_CORRECTOR.correct_numeric_parameter(
page_size, min_val=1, max_val=1000, field_name="page_size"
)
search_params["page_size"] = healed_page_size
if healed_page_size != page_size:
validation_errors.append(f"Page size corrected from {page_size} to {healed_page_size}")
# Validate limit parameter
if limit is not None and (limit < 1 or limit > 1000):
healed_limit = _PARAMETER_CORRECTOR.correct_numeric_parameter(
limit, min_val=1, max_val=1000, field_name="limit"
)
search_params["limit"] = healed_limit
if healed_limit != limit:
validation_errors.append(f"Limit corrected from {limit} to {healed_limit}")
# Validate relevance threshold
if relevance_threshold < 0.0 or relevance_threshold > 1.0:
healed_threshold = _PARAMETER_CORRECTOR.correct_numeric_parameter(
relevance_threshold, min_val=0.0, max_val=1.0, field_name="relevance_threshold"
)
search_params["relevance_threshold"] = healed_threshold
if healed_threshold != relevance_threshold:
validation_errors.append(f"Relevance threshold corrected from {relevance_threshold} to {healed_threshold}")
# Return search query with validation warnings if any
return {
"search_params": search_params,
"project_context": project_context,
"resolved_project": resolved_project,
"validation_warnings": validation_errors,
"query_built": True
}
except Exception as e:
# Apply comprehensive exception healing for query building
healed_result = _EXCEPTION_HEALER.heal_complex_exception_combination(
e, {
"operation": "build_search_query",
"config": final_config.to_dict(),
"project": project
}
)
if healed_result and healed_result.get("success") and "healed_values" in healed_result:
# Create emergency search query
emergency_params = _FALLBACK_MANAGER.apply_emergency_fallback(
"query_entries", healed_result["healed_values"]
)
emergency_config = QueryEntriesConfig.from_legacy_params(
project=emergency_params.get("project", "default"),
message=emergency_params.get("message", "fallback query"),
message_mode=emergency_params.get("message_mode", "substring"),
limit=emergency_params.get("limit", 10),
page=emergency_params.get("page", 1),
page_size=emergency_params.get("page_size", 10),
search_scope=emergency_params.get("search_scope", "project")
)
# Return emergency query
return {
"search_params": emergency_config.to_dict(),
"project_context": context,
"resolved_project": emergency_params.get("project", "default"),
"validation_warnings": [f"Query built with emergency fallback due to error: {str(e)}"],
"query_built": True,
"emergency_fallback": True
}
else:
# Return error if query building fails
return {
"search_params": {},
"project_context": context,
"resolved_project": project.get("name", "default") if project else "default",
"validation_warnings": [f"Failed to build search query: {str(e)}"],
"query_built": False,
"error": str(e)
}
async def _execute_search_with_fallbacks(
search_query: Dict[str, Any],
final_config: QueryEntriesConfig
) -> Dict[str, Any]:
"""
Execute search query with comprehensive error handling and intelligent fallbacks.
This function extracts the search execution logic from the monolithic query_entries
function and adds bulletproof error handling with multiple fallback strategies.
"""
try:
search_params = search_query["search_params"]
project_context = search_query["project_context"]
resolved_project = search_query["resolved_project"]
validation_warnings = search_query.get("validation_warnings", [])
# Execute the search with error handling
try:
# Get log file path
if resolved_project and project_context.project:
log_path = Path(project_context.project["progress_log"])
else:
# Fallbacks when no active project is resolved.
# Prefer a configured project (if present), otherwise try common on-disk layouts.
fallback_project = None
if resolved_project and resolved_project != "default":
try:
fallback_project = load_project_config(resolved_project)
except Exception:
fallback_project = None
if fallback_project and fallback_project.get("progress_log"):
log_path = Path(fallback_project["progress_log"])
else:
candidates: List[Path] = []
# Most common runtime layout (used by tests and Codex CLI runs)
if resolved_project and isinstance(resolved_project, str):
slug = resolved_project.strip().lower().replace(" ", "_")
candidates.append(Path(".scribe") / "docs" / "dev_plans" / slug / "PROGRESS_LOG.md")
# Template defaults (legacy fallbacks)
candidates.append(Path(".scribe") / "docs" / "dev_plans" / "default" / "PROGRESS_LOG.md")
candidates.append(Path("docs") / "dev_plans" / "default" / "PROGRESS_LOG.md")
# If a single project exists under .scribe/docs/dev_plans, use it.
base = Path(".scribe") / "docs" / "dev_plans"
try:
if base.exists():
candidates.extend(sorted(base.glob("*/PROGRESS_LOG.md")))
except Exception:
pass
log_path = next((p for p in candidates if p.exists()), candidates[0])
# Read log lines with error handling
try:
lines = await read_all_lines(log_path)
except Exception as read_error:
# Try to heal file reading error
healed_read = _EXCEPTION_HEALER.heal_document_operation_error(
read_error, {"log_path": str(log_path), "operation": "read_log"}
)
if healed_read.get("success") and healed_read.get("healed_values"):
# Try alternative log path
alt_log_path = healed_read["healed_values"].get("log_path", log_path)
lines = await read_all_lines(Path(alt_log_path))
else:
# Apply fallback - return empty results
lines = []
validation_warnings.append(f"Could not read log file: {str(read_error)}")
# Apply search filters with error handling
filtered_entries = []
for line in lines:
try:
parsed = parse_log_line(line)
if not parsed:
continue
# Apply message filter
message = parsed.get("message", "")
if search_params.get("message"):
if not message_matches(
message,
search_params["message"],
mode=search_params.get("message_mode", "substring"),
case_sensitive=search_params.get("case_sensitive", False),
):
continue
# Apply emoji filter
if search_params.get("emoji"):
entry_emoji = parsed.get("emoji", "")
if entry_emoji not in search_params["emoji"]:
continue
# Apply status filter (mapped to emojis)
if search_params.get("status"):
entry_emoji = parsed.get("emoji", "")
for status_filter in search_params["status"]:
status_emojis = STATUS_EMOJI.get(status_filter.lower(), [])
if entry_emoji not in status_emojis:
break
else:
continue # No break occurred, emoji matches all status filters
# Apply agent filter
if search_params.get("agents"):
entry_agent = parsed.get("agent", "")
if entry_agent not in search_params["agents"]:
continue
# Apply metadata filters
if search_params.get("meta_filters"):
entry_meta = parsed.get("meta", {}) or {}
normalized_filters, meta_error = normalize_meta_filters(search_params["meta_filters"])
if meta_error:
validation_warnings.append(f"Ignoring meta_filters due to error: {meta_error}")
normalized_filters = {}
if normalized_filters and not _meta_matches(entry_meta, normalized_filters):
continue
# Apply priority filter
if search_params.get("priority"):
entry_priority = parsed.get("meta", {}).get("priority", "medium")
if entry_priority not in search_params["priority"]:
continue
# Apply category filter
if search_params.get("category"):
entry_category = parsed.get("meta", {}).get("category")
if entry_category not in search_params["category"]:
continue
# Apply confidence filter
if search_params.get("min_confidence") is not None:
entry_confidence = float(parsed.get("meta", {}).get("confidence", 1.0))
if entry_confidence < search_params["min_confidence"]:
continue
# Apply time range filter
if search_params.get("start") or search_params.get("end"):
entry_timestamp = parsed.get("ts", "")
if entry_timestamp:
try:
entry_dt = datetime.fromisoformat(entry_timestamp.replace('Z', '+00:00'))
if search_params.get("start"):
start_dt = datetime.fromisoformat(search_params["start"].replace('Z', '+00:00'))
if entry_dt < start_dt:
continue
if search_params.get("end"):
end_dt = datetime.fromisoformat(search_params["end"].replace('Z', '+00:00'))
if entry_dt > end_dt:
continue
except ValueError:
# Skip entries with invalid timestamps
continue
# Apply relevance threshold (simplified implementation)
if search_params.get("relevance_threshold", 0.0) > 0.0:
# Simple relevance calculation based on message length and content
relevance_score = len(message) / 1000.0 # Normalize to 0-1 range
if relevance_score < search_params["relevance_threshold"]:
continue
# Entry matches all filters
filtered_entries.append(parsed)
except Exception as filter_error:
# Try to heal individual entry processing error
healed_filter = _EXCEPTION_HEALER.heal_bulk_processing_error(
filter_error, {"line": line, "operation": "entry_filtering"}
)
if not healed_filter or not healed_filter.get("success"):
# Skip problematic entry but continue processing
continue
# Apply priority sorting if requested
if search_params.get("priority_sort", False):
from scribe_mcp.shared.log_enums import get_priority_sort_key
filtered_entries.sort(
key=lambda e: (
get_priority_sort_key(e.get("meta", {}).get("priority", "medium")),
e.get("ts_iso", "")
),
reverse=False # Lower priority numbers first (critical=0), timestamp DESC maintained
)
# Apply pagination with error handling
try:
page = search_params.get("page", 1)
page_size = search_params.get("page_size", 50)
limit = search_params.get("limit")
if limit is None:
limit = page_size
# Calculate pagination
total_entries = len(filtered_entries)
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
# Apply limit
if limit < page_size:
end_idx = start_idx + limit
paginated_entries = filtered_entries[start_idx:end_idx]
# Create pagination info
pagination_info = {
"page": page,
"page_size": page_size,
"total_entries": total_entries,
"has_next": end_idx < total_entries,
"has_prev": page > 1
}
except Exception as pagination_error:
# Try to heal pagination error
healed_pagination = _EXCEPTION_HEALER.heal_parameter_validation_error(
pagination_error, {"page": page, "page_size": page_size, "limit": limit}
)
if healed_pagination["success"]:
# Use healed pagination values
healed_page = healed_pagination["healed_values"].get("page", 1)
healed_page_size = healed_pagination["healed_values"].get("page_size", 50)
start_idx = (healed_page - 1) * healed_page_size
end_idx = start_idx + healed_page_size
paginated_entries = filtered_entries[start_idx:end_idx]
pagination_info = {
"page": healed_page,
"page_size": healed_page_size,
"total_entries": len(filtered_entries),
"has_next": end_idx < len(filtered_entries),
"has_prev": healed_page > 1
}
validation_warnings.append(f"Pagination corrected due to error: {str(pagination_error)}")
else:
# Apply safe pagination fallback
paginated_entries = filtered_entries[:50] # First 50 entries
pagination_info = {
"page": 1,
"page_size": 50,
"total_entries": len(filtered_entries),
"has_next": len(filtered_entries) > 50,
"has_prev": False
}
validation_warnings.append(f"Pagination error, using safe fallback: {str(pagination_error)}")
# Format response with error handling
try:
compact = search_params.get("compact", False)
fields = search_params.get("fields")
include_metadata = search_params.get("include_metadata", True)
if compact:
# Format compact response
formatted_entries = []
for entry in paginated_entries:
compact_entry = {
"id": entry.get("id", ""),
"ts": entry.get("ts", ""),
"msg": entry.get("message", "")[:100] + "..." if len(entry.get("message", "")) > 100 else entry.get("message", ""),
"emoji": entry.get("emoji", ""),
"agent": entry.get("agent", "")
}
if include_metadata and entry.get("meta"):
compact_entry["meta"] = entry["meta"]
formatted_entries.append(compact_entry)
else:
# Format full response with field filtering
formatted_entries = []
for entry in paginated_entries:
if fields:
# Include only requested fields
filtered_entry = {}
for field in fields:
if field in entry:
filtered_entry[field] = entry[field]
formatted_entries.append(filtered_entry)
else:
# Include all fields
formatted_entry = entry.copy()
if not include_metadata and "meta" in formatted_entry:
del formatted_entry["meta"]
formatted_entries.append(formatted_entry)
except Exception as format_error:
# Try to heal response formatting error
healed_format = _EXCEPTION_HEALER.heal_document_operation_error(
format_error, {"operation": "response_formatting", "entries_count": len(paginated_entries)}
)
if healed_format["success"]:
# Use healed formatting
formatted_entries = paginated_entries # Simple fallback
else:
# Apply safe formatting fallback
formatted_entries = []
for entry in paginated_entries:
safe_entry = {
"id": entry.get("id", ""),
"ts": entry.get("ts", ""),
"message": entry.get("message", ""),
"emoji": entry.get("emoji", ""),
"agent": entry.get("agent", "")
}
formatted_entries.append(safe_entry)
validation_warnings.append(f"Response formatting error, using safe fallback: {str(format_error)}")
# Return successful search results
return {
"ok": True,
"entries": formatted_entries,
"pagination": pagination_info,
"search_params": search_params,
"validation_warnings": validation_warnings,
"total_found": len(filtered_entries),
"returned": len(formatted_entries)
}
except Exception as search_error:
# Apply comprehensive search error healing
healed_search = _EXCEPTION_HEALER.heal_document_operation_error(
search_error, {"operation": "search_execution", "project": resolved_project}
)
if healed_search["success"]:
# Try alternative search with simplified parameters
effective_limit = search_params.get("limit")
if effective_limit is None:
effective_limit = search_params.get("page_size", 50)
simplified_params = {
"project": resolved_project,
"message": search_params.get("message", ""),
"limit": min(effective_limit, 10), # Reduce limit for safety
"page": 1,
"page_size": min(search_params.get("page_size", 50) or 50, 10)
}
# Execute simplified search
return await _execute_search_with_fallbacks(
{"search_params": simplified_params, "project_context": project_context, "resolved_project": resolved_project},
final_config
)
else:
# Return error response
return {
"ok": False,
"error": f"Search execution failed: {str(search_error)}",
"suggestion": "Try with simpler search parameters",
"validation_warnings": validation_warnings,
"search_params": search_params
}
except Exception as e:
# Apply ultimate exception healing for search execution
healed_result = _EXCEPTION_HEALER.heal_emergency_exception(
e, {"operation": "search_execution", "project": search_query.get("resolved_project", "unknown")}
)
if healed_result and healed_result.get("success") and "healed_values" in healed_result:
# Create emergency search results
emergency_params = _FALLBACK_MANAGER.apply_emergency_fallback(
"query_entries", healed_result["healed_values"]
)
return {
"ok": True,
"entries": [{
"id": f"emergency-{uuid.uuid4().hex[:16]}",
"ts": format_utc(utcnow()),
"message": emergency_params.get("message", "Emergency search result after critical error"),
"emoji": emergency_params.get("emoji", "🚨"),
"agent": emergency_params.get("agent", "Scribe"),
"meta": {"emergency_fallback": True, "critical_error": str(e)}
}],
"pagination": {
"page": 1,
"page_size": 1,
"total_entries": 1,
"has_next": False,
"has_prev": False
},
"validation_warnings": [f"Emergency fallback due to critical error: {str(e)}"],
"total_found": 1,
"returned": 1,
"emergency_fallback": True
}
else:
return {
"ok": False,
"error": f"Critical search error: {str(e)}",
"suggestion": "Check system configuration and try again",
"entries": [],
"pagination": {},
"total_found": 0,
"returned": 0
}
@app.tool()
async def query_entries(
project: Optional[str] = None,
start: Optional[str] = None,
end: Optional[str] = None,
message: Optional[str] = None,
message_mode: str = "substring",
case_sensitive: bool = False,
emoji: Optional[List[str]] = None,
status: Optional[List[str]] = None,
agent: Optional[Any] = None, # tolerate legacy single-agent param
agents: Optional[List[str]] = None,
meta_filters: Optional[Dict[str, Any]] = None,
limit: int = 50,
page: int = 1,
page_size: int = 10,
compact: bool = False,
fields: Optional[List[str]] = None,
include_metadata: bool = True,
# Phase 4 Enhanced Search Parameters
search_scope: str = "project", # "project"|"global"|"all_projects"|"research"|"bugs"|"all"
document_types: Optional[List[str]] = None, # ["progress", "research", "architecture", "bugs", "global"]
include_outdated: bool = True, # Include warnings for stale info
verify_code_references: bool = False, # Check if mentioned code still exists
time_range: Optional[str] = None, # "last_30d", "last_7d", "today", etc.
relevance_threshold: float = 0.0, # 0.0-1.0 relevance scoring threshold
max_results: Optional[int] = None, # Override for limit (deprecated but kept for compatibility)
config: Optional[QueryEntriesConfig] = None, # Configuration object for dual parameter support
format: str = "readable", # Output format: readable (default), structured, compact
# Phase 5 Priority/Category Filter Parameters
priority: Optional[List[str]] = None, # Filter by priority levels (e.g., ["critical", "high"])
category: Optional[List[str]] = None, # Filter by categories (e.g., ["bug", "security"])
min_confidence: Optional[float] = None, # Minimum confidence threshold (0.0-1.0)
priority_sort: bool = False, # If True, sort by priority (critical first) then by time
**_kwargs: Any, # tolerate unknown kwargs (contract: tools never TypeError)
) -> Dict[str, Any]:
"""Search the project log with flexible filters and pagination.
Args:
project: Project name (uses active project if None)
start: Start timestamp filter
end: End timestamp filter
message: Message text filter
message_mode: How to match message (substring, regex, exact)
case_sensitive: Case sensitive message matching
emoji: Filter by emoji(s)
status: Filter by status(es) (mapped to emojis)
agents: Filter by agent name(s)
meta_filters: Filter by metadata key/value pairs
limit: Maximum results to return (legacy, for backward compatibility)
page: Page number for pagination (1-based)
page_size: Number of results per page
compact: Use compact response format with short field names
fields: Specific fields to include in response
include_metadata: Include metadata field in entries
# Phase 4 Enhanced Search Parameters
search_scope: Search scope - "project", "global", "all_projects", "research", "bugs", or "all"
document_types: Filter by document types - ["progress", "research", "architecture", "bugs", "global"]
include_outdated: Include outdated information with warnings
verify_code_references: Check if mentioned code still exists
time_range: Time range filter - "last_30d", "last_7d", "today", etc.
relevance_threshold: Minimum relevance score (0.0-1.0) for results
max_results: Override maximum results (deprecated, use limit/page_size instead)
config: Configuration object for dual parameter support. If provided, legacy parameters
take precedence when both are specified.
format: Output format - "readable" (human-friendly, default), "structured" (full JSON), "compact" (minimal)
# Phase 5 Priority/Category Filter Parameters
priority: Filter by priority levels (e.g., ["critical", "high"])
category: Filter by categories (e.g., ["bug", "security"])
min_confidence: Minimum confidence threshold (0.0-1.0)
priority_sort: If True, sort by priority (critical first) then by time
Returns:
Paginated response with entries and metadata
"""
# Phase 3 Task 3.5: Enhanced Function Decomposition
# This function now uses decomposed sub-functions with bulletproof error handling
# Back-compat: accept singular `agent=` and merge into `agents=`
if agent:
if agents is None:
agents = []
if isinstance(agent, str):
agents = [*agents, agent]
elif isinstance(agent, list):
agents = [*agents, *[a for a in agent if isinstance(a, str)]]
try:
state_snapshot = await server_module.state_manager.record_tool("query_entries")
except Exception:
state_snapshot = {}
try:
# === PHASE 3 ENHANCED PARAMETER VALIDATION AND PREPARATION ===
# Replace monolithic parameter handling with bulletproof validation and healing
final_config, validation_info = _validate_search_parameters(
project=project,
start=start,
end=end,
message=message,
message_mode=message_mode,
case_sensitive=case_sensitive,
emoji=emoji,
status=status,
agents=agents,
meta_filters=meta_filters,
limit=limit,
page=page,
page_size=page_size,
compact=compact,
fields=fields,
priority=priority,
category=category,
min_confidence=min_confidence,
priority_sort=priority_sort,
include_metadata=include_metadata,
search_scope=search_scope,
document_types=document_types,
include_outdated=include_outdated,
verify_code_references=verify_code_references,
time_range=time_range,
relevance_threshold=relevance_threshold,
max_results=max_results,
config=config
)
# === CONTEXT RESOLUTION WITH ENHANCED ERROR HANDLING ===
try:
context = await resolve_logging_context(
tool_name="query_entries",
server_module=server_module,
agent_id=None,
require_project=False, # query_entries can work without active project
state_snapshot=state_snapshot,
)
except Exception as context_error:
# Apply Phase 2 ExceptionHealer for context resolution errors
healed_context = _EXCEPTION_HEALER.heal_parameter_validation_error(
context_error, {"tool_name": "query_entries"}
)
if healed_context and healed_context.get("success"):
# Create minimal fallback context
context = type('obj', (object,), {
'project': None,
'recent_projects': [],
'reminders': []
})()
else:
# Continue with empty context
context = type('obj', (object,), {
'project': None,
'recent_projects': [],
'reminders': []
})()
project = context.project or {}
# === ENHANCED SEARCH QUERY BUILDING ===
search_query = _build_search_query(final_config, context, project)
if not search_query.get("query_built", False):
# If query building failed, try to continue with emergency query
if search_query.get("emergency_fallback"):
# Execute emergency search
search_result = await _execute_search_with_fallbacks(search_query, final_config)
search_result["parameter_healing"] = True
search_result["emergency_fallback"] = True
return search_result
else:
# Return error if query building completely failed
return {
"ok": False,
"error": "Failed to build search query",
"details": search_query.get("error", "Unknown query building error"),
"suggestion": "Try with simpler search parameters"
}
# === ENHANCED SEARCH EXECUTION WITH FALLBACKS ===
search_result = await _execute_search_with_fallbacks(search_query, final_config)
# Add validation info to result if healing was applied
if validation_info.get("healing_applied"):
search_result["parameter_healing"] = True
if validation_info.get("exception_healing"):
search_result["parameter_exception_healing"] = True
elif validation_info.get("emergency_fallback"):
search_result["parameter_emergency_fallback"] = True
else:
search_result["parameter_healing_applied"] = True
# Add query building warnings
if search_query.get("validation_warnings"):
if "warnings" not in search_result:
search_result["warnings"] = []
search_result["warnings"].extend(search_query["validation_warnings"])
# Add search execution warnings
if search_result.get("validation_warnings"):
if "warnings" not in search_result:
search_result["warnings"] = []
search_result["warnings"].extend(search_result["validation_warnings"])
# Add search parameters for readable formatter to show in header
if message:
search_result["search_message"] = message
if status:
search_result["search_status"] = status
if agents:
search_result["search_agents"] = agents
if emoji:
search_result["search_emoji"] = emoji
# Route through formatter for readable/structured/compact output
return await default_formatter.finalize_tool_response(
data=search_result,
format=format,
tool_name="query_entries"
)
except Exception as e:
# === ULTIMATE EXCEPTION HANDLING AND FALLBACK ===
# Apply Phase 2 ExceptionHealer for unexpected errors
healed_result = _EXCEPTION_HEALER.heal_emergency_exception(
e, {
"operation": "query_entries_main",
"project": project,
"message": message,
"tool": "query_entries"
}
)
if healed_result and healed_result.get("success") and "healed_values" in healed_result:
# Create emergency search with healed parameters
emergency_params = _FALLBACK_MANAGER.apply_emergency_fallback(
"query_entries", healed_result["healed_values"]
)
# Create minimal emergency search result
return {
"ok": True,
"entries": [{
"id": f"emergency-{uuid.uuid4().hex[:16]}",
"ts": format_utc(utcnow()),
"message": emergency_params.get("message", "Emergency search result after critical error"),
"emoji": emergency_params.get("emoji", "🚨"),
"agent": emergency_params.get("agent", "Scribe"),
"meta": {
"emergency_fallback": True,
"critical_error": str(e),
"healed_exception": True
}
}],
"pagination": {
"page": 1,
"page_size": 1,
"total_entries": 1,
"has_next": False,
"has_prev": False
},
"emergency_fallback": True,
"original_error": str(e)
}
else:
# Return error if even emergency healing fails
return {
"ok": False,
"error": f"Critical error in query_entries: {str(e)}",
"emergency_healing_failed": True,
"suggestion": "Check system configuration and try again",
"entries": [],
"pagination": {}
}
async def _resolve_cross_project_projects(
search_scope: str,
document_types: Optional[List[str]],
) -> List[Dict[str, Any]]:
"""Resolve projects to search based on search scope and document types."""
state = await server_module.state_manager.load()
projects: List[Dict[str, Any]] = []
if search_scope == "all_projects":
# Search all configured projects
for project_name in state.projects:
# Try to get complete project data from state first
project_dict = state.get_project(project_name)
if project_dict:
# If project data is incomplete, load from config
if not project_dict.get("progress_log"):
fallback_project = load_project_config(project_name)
if fallback_project:
project_dict = fallback_project
project_dict["name"] = project_name
else:
# Skip projects without valid config
continue
else:
# Ensure name is set
project_dict["name"] = project_name
projects.append(project_dict)
elif search_scope == "global":
# Search only the global log
# Return a special project config for global log
global_config = {
"name": "global",
"progress_log": "docs/GLOBAL_PROGRESS_LOG.md",
"docs_dir": "docs",
"root": "."
}
projects.append(global_config)
elif search_scope in ["research", "bugs", "all"]:
# Search specific document types across all projects
for project_name in state.projects:
# Try to get complete project data from state first
project_dict = state.get_project(project_name)
if project_dict:
# If project data is incomplete, load from config
if not project_dict.get("progress_log"):
fallback_project = load_project_config(project_name)
if fallback_project:
project_dict = fallback_project
project_dict["name"] = project_name
else:
# Skip projects without valid config
continue
else:
# Ensure name is set
project_dict["name"] = project_name
# Check if project has the requested document types
if _project_has_document_types(project_dict, document_types, search_scope):
projects.append(project_dict)
# Always include global log for these scopes
global_config = {
"name": "global",
"progress_log": "docs/GLOBAL_PROGRESS_LOG.md",
"docs_dir": "docs",
"root": "."
}
projects.append(global_config)
return projects
def _project_has_document_types(
project: Dict[str, Any],
document_types: Optional[List[str]],
search_scope: str
) -> bool:
"""Check if a project has the requested document types."""
if not document_types and search_scope != "all":
return True
docs_dir = Path(project.get("docs_dir", ""))
# Check for specific document types
for doc_type in document_types or []:
if doc_type == "research":
if (docs_dir / "research").exists():
return True
elif doc_type == "architecture":
if (docs_dir / "ARCHITECTURE_GUIDE.md").exists():
return True
elif doc_type == "bugs":
if (docs_dir / "BUG_LOG.md").exists():
return True
elif doc_type == "global":
# Skip global log for individual projects
continue
elif doc_type == "progress":
# All projects have progress logs
return True
return search_scope == "all" # For "all" scope, include all projects
async def _query_file(
project: Dict[str, Any],
*,
limit: Optional[int],
start: Optional[datetime],
end: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
) -> List[Dict[str, Any]]:
path = Path(project["progress_log"])
lines = await read_all_lines(path)
results: List[Dict[str, Any]] = []
def within_bounds(ts_str: Optional[str]) -> bool:
if not ts_str:
return False
parsed = coerce_range_boundary(ts_str, end=False)
if not parsed:
return False
if start and parsed < start:
return False
if end and parsed > end:
return False
return True
agent_set = set(agents or [])
emoji_set = set(emojis or [])
for line in reversed(lines):
parsed = parse_log_line(line)
if not parsed:
continue
if (start or end) and not within_bounds(parsed.get("ts")):
continue
if agent_set and parsed.get("agent") not in agent_set:
continue
if emoji_set and parsed.get("emoji") not in emoji_set:
continue
if meta_filters and not _meta_matches(parsed.get("meta") or {}, meta_filters):
continue
if not message_matches(
parsed.get("message"),
message,
mode=message_mode,
case_sensitive=case_sensitive,
):
continue
results.append(parsed)
if limit is not None and len(results) >= limit:
break
return results
async def _handle_cross_project_search(
projects: List[Dict[str, Any]],
search_scope: str,
document_types: Optional[List[str]],
start_bound: Optional[datetime],
end_bound: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
page: int,
page_size: int,
compact: bool,
fields: Optional[List[str]],
include_metadata: bool,
verify_code_references: bool,
relevance_threshold: float,
state_snapshot: Dict[str, Any],
helper: LoggingToolMixin,
context: Optional[LoggingContext],
) -> Dict[str, Any]:
"""Handle cross-project search with result aggregation and pagination."""
all_results: List[Dict[str, Any]] = []
project_context: Dict[str, Dict[str, Any]] = {}
# Search each project and collect results
for project in projects:
project_results = await _search_single_project(
project=project,
document_types=document_types,
start_bound=start_bound,
end_bound=end_bound,
message=message,
message_mode=message_mode,
case_sensitive=case_sensitive,
agents=agents,
emojis=emojis,
meta_filters=meta_filters,
verify_code_references=verify_code_references,
relevance_threshold=relevance_threshold,
)
# Add project context to each result
for result in project_results:
result["project_name"] = project["name"]
result["project_root"] = project.get("root", "")
if include_metadata:
result["project_context"] = {
"docs_dir": project.get("docs_dir", ""),
"progress_log": project.get("progress_log", ""),
}
all_results.extend(project_results)
project_context[project["name"]] = project
# Apply relevance scoring and sorting
if relevance_threshold > 0.0:
all_results = _apply_relevance_scoring(
all_results,
message,
relevance_threshold
)
# Sort by relevance score (descending) then timestamp (most recent first)
all_results.sort(key=lambda x: (
x.get("relevance_score", 0.0),
x.get("timestamp", "")
), reverse=True)
# Apply pagination
total_count = len(all_results)
start_idx, end_idx = _PAGINATION_CALCULATOR.calculate_pagination_indices(page, page_size, total_count)
paginated_results = all_results[start_idx:end_idx]
# Create pagination info
pagination_info = create_pagination_info(page, page_size, total_count)
if context is None:
context = await helper.prepare_context(
tool_name="query_entries",
agent_id=None,
explicit_project=None,
require_project=False,
state_snapshot=state_snapshot,
)
response = helper.success_with_entries(
entries=paginated_results,
context=context,
compact=compact,
fields=fields,
include_metadata=include_metadata,
pagination=pagination_info,
extra_data={
"search_scope": search_scope,
"document_types": document_types,
"projects_searched": list(project_context.keys()),
"total_results_across_projects": total_count,
},
)
# Record token usage
if token_estimator:
token_estimator.record_operation(
operation="query_entries_cross_project",
input_data={
"search_scope": search_scope,
"document_types": document_types,
"projects_count": len(projects),
"page": page,
"page_size": page_size,
"compact": compact,
"fields": fields,
"include_metadata": include_metadata,
"verify_code_references": verify_code_references,
"relevance_threshold": relevance_threshold,
},
response=response,
compact_mode=compact,
page_size=page_size
)
return response
async def _search_single_project(
project: Dict[str, Any],
document_types: Optional[List[str]],
start_bound: Optional[datetime],
end_bound: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
verify_code_references: bool,
relevance_threshold: float,
) -> List[Dict[str, Any]]:
"""Search a single project including its document types."""
results: List[Dict[str, Any]] = []
# Search main progress log
if not document_types or "progress" in document_types:
progress_results = await _query_file(
project=project,
limit=None, # Get all, we'll paginate later
start=start_bound,
end=end_bound,
message=message,
message_mode=message_mode,
case_sensitive=case_sensitive,
agents=agents,
emojis=emojis,
meta_filters=meta_filters,
)
results.extend(progress_results)
# Search other document types
docs_dir = Path(project.get("docs_dir", ""))
if document_types:
if "research" in document_types:
research_results = await _search_research_documents(
docs_dir, start_bound, end_bound, message, message_mode,
case_sensitive, agents, emojis, meta_filters
)
results.extend(research_results)
if "architecture" in document_types:
arch_results = await _search_architecture_documents(
docs_dir, start_bound, end_bound, message, message_mode,
case_sensitive, agents, emojis, meta_filters
)
results.extend(arch_results)
if "bugs" in document_types:
bug_results = await _search_bug_documents(
docs_dir, start_bound, end_bound, message, message_mode,
case_sensitive, agents, emojis, meta_filters
)
results.extend(bug_results)
if "global" in document_types and project["name"] == "global":
# Global log already handled as progress log for global project
pass
# Apply code reference verification if requested
if verify_code_references:
results = _verify_code_references_in_results(results)
# Add basic relevance scoring
results = _calculate_basic_relevance(results, message)
return results
async def _search_research_documents(
docs_dir: Path,
start_bound: Optional[datetime],
end_bound: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
) -> List[Dict[str, Any]]:
"""Search research documents in the research/ subdirectory."""
results: List[Dict[str, Any]] = []
research_dir = docs_dir / "research"
if not research_dir.exists():
return results
# Find all research markdown files
for research_file in research_dir.glob("RESEARCH_*.md"):
try:
content = await read_all_lines(research_file)
# Parse research documents and extract searchable content
research_results = _parse_research_document(
research_file, content, start_bound, end_bound,
message, message_mode, case_sensitive, agents, emojis, meta_filters
)
results.extend(research_results)
except Exception as e:
# Log error but continue with other files
continue
return results
async def _search_architecture_documents(
docs_dir: Path,
start_bound: Optional[datetime],
end_bound: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
) -> List[Dict[str, Any]]:
"""Search architecture guide documents."""
results: List[Dict[str, Any]] = []
arch_file = docs_dir / "ARCHITECTURE_GUIDE.md"
if not arch_file.exists():
return results
try:
content = await read_all_lines(arch_file)
arch_results = _parse_markdown_document(
arch_file, content, "architecture", start_bound, end_bound,
message, message_mode, case_sensitive, agents, emojis, meta_filters
)
results.extend(arch_results)
except Exception:
pass
return results
async def _search_bug_documents(
docs_dir: Path,
start_bound: Optional[datetime],
end_bound: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
) -> List[Dict[str, Any]]:
"""Search bug report documents."""
results: List[Dict[str, Any]] = []
bug_file = docs_dir / "BUG_LOG.md"
if not bug_file.exists():
return results
try:
content = await read_all_lines(bug_file)
bug_results = _parse_markdown_document(
bug_file, content, "bugs", start_bound, end_bound,
message, message_mode, case_sensitive, agents, emojis, meta_filters
)
results.extend(bug_results)
except Exception:
pass
return results
def _parse_research_document(
file_path: Path,
content: List[str],
start_bound: Optional[datetime],
end_bound: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
) -> List[Dict[str, Any]]:
"""Parse research document into searchable entries."""
results: List[Dict[str, Any]] = []
# Extract sections from research document
current_section = ""
section_content = []
for line in content:
if line.startswith("# "):
# New top-level section
if current_section and section_content:
result = _create_document_entry(
file_path, current_section, section_content,
"research", start_bound, end_bound, message,
message_mode, case_sensitive, agents, emojis, meta_filters
)
if result:
results.append(result)
current_section = line.strip()
section_content = []
elif line.strip():
section_content.append(line)
# Don't forget the last section
if current_section and section_content:
result = _create_document_entry(
file_path, current_section, section_content,
"research", start_bound, end_bound, message,
message_mode, case_sensitive, agents, emojis, meta_filters
)
if result:
results.append(result)
return results
def _parse_markdown_document(
file_path: Path,
content: List[str],
doc_type: str,
start_bound: Optional[datetime],
end_bound: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
) -> List[Dict[str, Any]]:
"""Parse generic markdown document into searchable entries."""
results: List[Dict[str, Any]] = []
# Split document into sections based on headers
current_section = ""
section_content = []
for line in content:
if line.startswith("#"):
# New section
if current_section and section_content:
result = _create_document_entry(
file_path, current_section, section_content,
doc_type, start_bound, end_bound, message,
message_mode, case_sensitive, agents, emojis, meta_filters
)
if result:
results.append(result)
current_section = line.strip()
section_content = []
elif line.strip():
section_content.append(line)
# Process final section
if current_section and section_content:
result = _create_document_entry(
file_path, current_section, section_content,
doc_type, start_bound, end_bound, message,
message_mode, case_sensitive, agents, emojis, meta_filters
)
if result:
results.append(result)
return results
def _create_document_entry(
file_path: Path,
section: str,
content: List[str],
doc_type: str,
start_bound: Optional[datetime],
end_bound: Optional[datetime],
message: Optional[str],
message_mode: str,
case_sensitive: bool,
agents: Optional[List[str]],
emojis: Optional[List[str]],
meta_filters: Optional[Dict[str, str]],
) -> Optional[Dict[str, Any]]:
"""Create a searchable entry from document section."""
# Combine section and content for searching
full_text = section + "\n" + "\n".join(content)
# Apply message filtering
if message and not message_matches(
full_text, message, mode=message_mode, case_sensitive=case_sensitive
):
return None
# Extract metadata from content if possible
meta = {"document_type": doc_type, "source_file": str(file_path)}
# Apply other filters (simplified for document entries)
if agents and not any(agent in full_text for agent in agents):
return None
if emojis and not any(emoji in full_text for emoji in emojis):
return None
if meta_filters:
for key, value in meta_filters.items():
if key not in meta or str(meta[key]) != str(value):
return None
# Create entry
return {
"timestamp": format_utc(), # Use current time for document entries
"message": section.strip(),
"agent": "DocumentParser",
"emoji": "📄",
"status": "info",
"meta": meta,
"content": full_text,
"relevance_score": 0.5, # Default relevance for document entries
}
def _verify_code_references_in_results(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Verify code file references exist and add warnings for broken ones."""
updated_results = []
for result in results:
# Look for file path patterns in the message or content
text_content = result.get("message", "") + " " + result.get("content", "")
# Find potential file references
import re
file_pattern = r'[\w\-/\.]+\.(py|js|ts|md|json|yaml|yml|sql|sh|bash|zsh)$'
potential_files = re.findall(file_pattern, text_content, re.IGNORECASE)
verified_result = result.copy()
broken_refs = []
for file_ref in potential_files:
# Try to find the file in the current project
if not _verify_file_exists(file_ref):
broken_refs.append(file_ref)
if broken_refs:
verified_result["meta"] = verified_result.get("meta", {})
verified_result["meta"]["broken_code_references"] = broken_refs
verified_result["meta"]["code_reference_verification"] = "failed"
# Add warning emoji if code references are broken
if result.get("emoji") != "⚠️":
verified_result["emoji"] = "⚠️"
else:
verified_result["meta"] = verified_result.get("meta", {})
verified_result["meta"]["code_reference_verification"] = "passed"
updated_results.append(verified_result)
return updated_results
def _verify_file_exists(file_ref: str) -> bool:
"""Check if a referenced file exists in the current codebase."""
from pathlib import Path
# Try different common locations
potential_paths = [
Path(file_ref), # Relative to current
Path("scribe_mcp") / file_ref, # In scribe_mcp
Path("scribe_mcp/tools") / file_ref, # In tools
Path("scribe_mcp/storage") / file_ref, # In storage
Path("docs") / file_ref, # In docs
Path("tests") / file_ref, # In tests
]
for path in potential_paths:
if path.exists():
return True
return False
def _calculate_basic_relevance(results: List[Dict[str, Any]], query_message: Optional[str]) -> List[Dict[str, Any]]:
"""Calculate basic relevance scores for search results."""
if not query_message:
# Default relevance for non-message queries
for result in results:
result["relevance_score"] = 0.5
return results
query_terms = query_message.lower().split()
for result in results:
score = 0.0
text = (result.get("message", "") + " " + result.get("content", "")).lower()
# Score based on term frequency
for term in query_terms:
if term in text:
score += 1.0
# Bonus for exact phrase matches
if query_message.lower() in text:
score += 2.0
# Bonus for recent entries (higher score for newer entries)
timestamp = result.get("timestamp", "")
if timestamp:
try:
from datetime import datetime
dt = datetime.fromisoformat(timestamp.replace(" UTC", ""))
days_ago = (datetime.now(dt.tzinfo) - dt).days
if isinstance(days_ago, int):
if days_ago <= 7:
score += 0.5
elif days_ago <= 30:
score += 0.25
except:
pass
result["relevance_score"] = min(score / len(query_terms) if query_terms else 0.5, 1.0)
return results
def _apply_relevance_scoring(results: List[Dict[str, Any]], query_message: Optional[str], threshold: float) -> List[Dict[str, Any]]:
"""Apply relevance threshold filtering using BulkProcessor utility."""
return BulkProcessor.filter_by_relevance_threshold(results, threshold, "relevance_score")
def _resolve_emojis(
emojis: Optional[List[str]],
statuses: Optional[List[str]],
) -> List[str]:
result: List[str] = []
seen = set()
for item in shared_clean_list(emojis, coerce_lower=False):
if item and item not in seen:
result.append(item)
seen.add(item)
for status in shared_clean_list(statuses, coerce_lower=True):
if not status:
continue
emoji = STATUS_EMOJI.get(status, STATUS_EMOJI.get(status.lower()))
if not emoji:
normalized = status.lower()
emoji = STATUS_EMOJI.get(normalized)
if emoji and emoji not in seen:
result.append(emoji)
seen.add(emoji)
return result
def _clean_list(items: Optional[List[str]]) -> List[str]:
return shared_clean_list(items, coerce_lower=False)
def _normalise_boundary(value: Optional[str], *, end: bool) -> Tuple[Optional[datetime], Optional[str]]:
if value is None:
return None, None
parsed = coerce_range_boundary(value, end=end)
if parsed is None:
return None, f"Unable to parse timestamp '{value}'. Expected ISO8601 or 'YYYY-MM-DD HH:MM:SS UTC'."
return parsed, None
def _meta_matches(
entry_meta: Dict[str, str],
filters: Dict[str, str],
) -> bool:
for key, expected in filters.items():
if entry_meta.get(key) != expected:
return False
return True