Skip to main content
Glama

Scribe MCP Server

by paxocial
query_entries.py8.68 kB
"""Advanced querying for progress log entries.""" from __future__ import annotations import re from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from scribe_mcp import server as server_module from scribe_mcp.server import app from scribe_mcp.tools.constants import STATUS_EMOJI from scribe_mcp.tools.project_utils import load_active_project, load_project_config from scribe_mcp.utils.logs import parse_log_line, read_all_lines from scribe_mcp.utils.search import message_matches from scribe_mcp.utils.time import coerce_range_boundary from scribe_mcp import reminders VALID_MESSAGE_MODES = {"substring", "regex", "exact"} META_KEY_PATTERN = re.compile(r"^[A-Za-z0-9_.:-]+$") @app.tool() async def query_entries( project: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, message: Optional[str] = None, message_mode: str = "substring", case_sensitive: bool = False, emoji: Optional[List[str]] = None, status: Optional[List[str]] = None, agents: Optional[List[str]] = None, meta_filters: Optional[Dict[str, Any]] = None, limit: int = 100, ) -> Dict[str, Any]: """Search the project log with flexible filters.""" state_snapshot = await server_module.state_manager.record_tool("query_entries") limit = max(1, min(limit, 500)) message_mode = (message_mode or "substring").lower() if message_mode not in VALID_MESSAGE_MODES: return {"ok": False, "error": f"Unsupported message_mode '{message_mode}'."} if message and message_mode == "regex": try: re.compile(message) except re.error as exc: return {"ok": False, "error": f"Invalid regex for message filter: {exc}"} normalised_emoji = _resolve_emojis(emoji, status) meta_normalised, meta_error = _normalise_meta_filters(meta_filters) if meta_error: return {"ok": False, "error": meta_error} start_bound, start_error = _normalise_boundary(start, end=False) if start_error: return {"ok": False, "error": start_error} end_bound, end_error = _normalise_boundary(end, end=True) if end_error: return {"ok": False, "error": end_error} project_data, active_name, recent = await _resolve_project(project) reminders_payload: List[Dict[str, Any]] = [] if not project_data: return { "ok": False, "error": f"Project '{project or active_name}' is not configured.", "recent_projects": list(recent), "reminders": reminders_payload, } backend = server_module.storage_backend if backend: record = await backend.fetch_project(project_data["name"]) if not record: record = await backend.upsert_project( name=project_data["name"], repo_root=project_data["root"], progress_log_path=project_data["progress_log"], ) rows = await backend.query_entries( project=record, limit=limit, start=start_bound.isoformat() if start_bound else None, end=end_bound.isoformat() if end_bound else None, agents=_clean_list(agents), emojis=normalised_emoji or None, message=message, message_mode=message_mode, case_sensitive=case_sensitive, meta_filters=meta_normalised or None, ) reminders_payload = await reminders.get_reminders( project_data, tool_name="query_entries", state=state_snapshot, ) return { "ok": True, "entries": rows, "recent_projects": list(recent), "reminders": reminders_payload, } return { "ok": True, "entries": await _query_file( project_data, limit=limit, start=start_bound, end=end_bound, message=message, message_mode=message_mode, case_sensitive=case_sensitive, agents=_clean_list(agents), emojis=normalised_emoji or None, meta_filters=meta_normalised or None, ), "recent_projects": list(recent), "reminders": await reminders.get_reminders( project_data, tool_name="query_entries", state=state_snapshot, ), } async def _resolve_project( project_name: Optional[str], ) -> Tuple[Optional[Dict[str, Any]], Optional[str], Tuple[str, ...]]: if not project_name: return await load_active_project(server_module.state_manager) state = await server_module.state_manager.load() project = state.get_project(project_name) if project: return project, project_name, tuple(state.recent_projects) config = load_project_config(project_name) return config, project_name, tuple(state.recent_projects) async def _query_file( project: Dict[str, Any], *, limit: int, start: Optional[datetime], end: Optional[datetime], message: Optional[str], message_mode: str, case_sensitive: bool, agents: Optional[List[str]], emojis: Optional[List[str]], meta_filters: Optional[Dict[str, str]], ) -> List[Dict[str, Any]]: path = Path(project["progress_log"]) lines = await read_all_lines(path) results: List[Dict[str, Any]] = [] def within_bounds(ts_str: Optional[str]) -> bool: if not ts_str: return False parsed = coerce_range_boundary(ts_str, end=False) if not parsed: return False if start and parsed < start: return False if end and parsed > end: return False return True agent_set = set(agents or []) emoji_set = set(emojis or []) for line in reversed(lines): parsed = parse_log_line(line) if not parsed: continue if (start or end) and not within_bounds(parsed.get("ts")): continue if agent_set and parsed.get("agent") not in agent_set: continue if emoji_set and parsed.get("emoji") not in emoji_set: continue if meta_filters and not _meta_matches(parsed.get("meta") or {}, meta_filters): continue if not message_matches( parsed.get("message"), message, mode=message_mode, case_sensitive=case_sensitive, ): continue results.append(parsed) if len(results) >= limit: break return results def _resolve_emojis( emojis: Optional[List[str]], statuses: Optional[List[str]], ) -> List[str]: result: List[str] = [] seen = set() for item in emojis or []: if item and item not in seen: result.append(item) seen.add(item) for status in statuses or []: if not status: continue emoji = STATUS_EMOJI.get(status, STATUS_EMOJI.get(status.lower())) if not emoji: normalized = status.lower() emoji = STATUS_EMOJI.get(normalized) if emoji and emoji not in seen: result.append(emoji) seen.add(emoji) return result def _clean_list(items: Optional[List[str]]) -> List[str]: if not items: return [] return [item for item in (entry.strip() for entry in items) if item] def _normalise_meta_filters( meta_filters: Optional[Dict[str, Any]], ) -> Tuple[Dict[str, str], Optional[str]]: if not meta_filters: return {}, None normalised: Dict[str, str] = {} for key, value in meta_filters.items(): if key is None: return {}, "Meta filter keys cannot be null." key_str = str(key).strip() if not key_str: return {}, "Meta filter keys cannot be empty." if not META_KEY_PATTERN.match(key_str): return {}, f"Meta filter key '{key}' contains unsupported characters." normalised[key_str] = str(value) return normalised, None def _normalise_boundary(value: Optional[str], *, end: bool) -> Tuple[Optional[datetime], Optional[str]]: if value is None: return None, None parsed = coerce_range_boundary(value, end=end) if parsed is None: return None, f"Unable to parse timestamp '{value}'. Expected ISO8601 or 'YYYY-MM-DD HH:MM:SS UTC'." return parsed, None def _meta_matches( entry_meta: Dict[str, str], filters: Dict[str, str], ) -> bool: for key, expected in filters.items(): if entry_meta.get(key) != expected: return False return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server