Skip to main content
Glama
file_command.py26.8 kB
"""Parsing utilities for the client-side ``filetool`` command.""" from __future__ import annotations import argparse import re import sys from collections.abc import Callable from dataclasses import dataclass from pathlib import Path from typing import Sequence, TextIO __all__ = [ "FileCommandError", "FileCommandHelp", "FileCommandParseError", "ManageFileCommand", "EscapeProfileContext", "list_escape_profiles", "parse_manage_file_command", "register_escape_profile", "render_manage_file_help", ] _ALLOWED_OPERATIONS = ( "append", "create", "delete", "insert", "locate", "patch", "prepend", "print", "stat", "replace", "substitute", "write", ) _INLINE_CONTENT_OPERATIONS = frozenset({ "append", "create", "insert", "patch", "prepend", "replace", "substitute", "write", }) _ESCAPE_SEQUENCE_PATTERN = re.compile(r"\\([nrt0])") def _decode_inline_escape_sequences(text: str) -> str: """Return ``text`` with common escape sequences expanded. The decoding intentionally focuses on the sequences most frequently used when authors need to express multi-line payloads from single-line shells. Only the escapes recognised by :class:`_ESCAPE_SEQUENCE_PATTERN` are expanded so that other backslash-prefixed values remain untouched (for example the ``\\`` that prefixes ``"\\ No newline at end of file"`` lines in unified diffs). """ def _replace(match: re.Match[str]) -> str: mapping = { "n": "\n", "r": "\r", "t": "\t", "0": "\0", } return mapping.get(match.group(1), match.group(0)) return _ESCAPE_SEQUENCE_PATTERN.sub(_replace, text) def _should_decode_inline_escape_sequences(text: str) -> bool: """Return ``True`` when ``text`` should be interpreted for inline escapes.""" if "\n" in text or "\r" in text: # Real newlines are already present so the caller was able to supply # structured content directly; avoid rewriting the payload. return False return _ESCAPE_SEQUENCE_PATTERN.search(text) is not None @dataclass(slots=True) class EscapeProfileContext: """Context describing when an escape profile is being applied.""" operation: str namespace: argparse.Namespace EscapeProfileHandler = Callable[[str, EscapeProfileContext], str] _ESCAPE_PROFILE_HANDLERS: dict[str, EscapeProfileHandler] = {} def register_escape_profile( name: str, handler: EscapeProfileHandler, *, replace: bool = False, ) -> None: """Register a new inline escape decoding profile. Parameters ---------- name: Human readable identifier referenced by the ``--escape-profile`` flag. handler: Callable responsible for transforming inline content based on the provided :class:`EscapeProfileContext`. replace: When ``True`` an existing profile with the same name is overwritten. Raises ------ ValueError Raised when ``name`` is empty/whitespace only, ``handler`` is not callable, or the profile already exists without ``replace`` enabled. """ if not callable(handler): # pragma: no cover - defensive guard raise ValueError("Escape profile handler must be callable") normalized = name.strip().lower() if not normalized: raise ValueError("Escape profile name must not be empty") if not replace and normalized in _ESCAPE_PROFILE_HANDLERS: raise ValueError(f"Escape profile '{normalized}' is already registered") _ESCAPE_PROFILE_HANDLERS[normalized] = handler def list_escape_profiles() -> tuple[str, ...]: """Return the registered escape profile names in alphabetical order.""" return tuple(sorted(_ESCAPE_PROFILE_HANDLERS)) def _normalize_escape_profile(name: str, *, usage: str | None = None) -> str: normalized = name.strip().lower() if not normalized: raise FileCommandParseError( "--escape-profile requires a non-empty value", usage=usage, ) if normalized not in _ESCAPE_PROFILE_HANDLERS: available = ", ".join(list_escape_profiles()) or "<none>" raise FileCommandParseError( f"Unknown escape profile '{name}'. Available profiles: {available}", usage=usage, ) return normalized def _auto_escape_profile_handler(text: str, context: EscapeProfileContext) -> str: if context.operation in _INLINE_CONTENT_OPERATIONS and _should_decode_inline_escape_sequences(text): return _decode_inline_escape_sequences(text) return text def _literal_escape_profile_handler(text: str, _context: EscapeProfileContext) -> str: return text def _ensure_builtin_profiles_registered() -> None: if "auto" not in _ESCAPE_PROFILE_HANDLERS: register_escape_profile("auto", _auto_escape_profile_handler) if "none" not in _ESCAPE_PROFILE_HANDLERS: register_escape_profile("none", _literal_escape_profile_handler) _ensure_builtin_profiles_registered() class FileCommandError(ValueError): """Base exception raised when parsing `filetool` arguments.""" class FileCommandHelp(FileCommandError): """Raised when help text should be displayed instead of executing the command.""" def __init__(self, help_text: str) -> None: super().__init__("help requested") self.help_text = help_text class FileCommandParseError(FileCommandError): """Raised when command arguments are invalid.""" def __init__(self, message: str, usage: str | None = None) -> None: super().__init__(message) self.usage = usage @dataclass(slots=True) class ManageFileCommand: """Represents a fully parsed ``filetool`` invocation.""" operation: str path: str content: str | None pattern: str | None line: int | None start_line: int | None end_line: int | None encoding: str create_parents: bool overwrite: bool create_if_missing: bool escape_profile: str follow_symlinks: bool output_format: str use_regex: bool ignore_case: bool max_replacements: int | None anchor_text: str | None anchor_use_regex: bool anchor_ignore_case: bool anchor_after: bool anchor_occurrence: int | None class _ArgumentParser(argparse.ArgumentParser): """Custom parser that raises exceptions instead of exiting.""" def error(self, message: str) -> None: # pragma: no cover - handled by caller raise FileCommandParseError(message, usage=self.format_usage()) def _create_parser() -> _ArgumentParser: description = ( "Call the server's manage_file tool to create, modify, inspect, or inventory remote files." ) epilog = """Operations: create Create or overwrite a file with optional content. write Replace the entire file contents. append Append text to the end of a file, optionally creating it. prepend Insert text at the beginning of a file while honouring creation flags. insert Insert new lines before a line number or text anchor. replace Replace a range of lines with new content. delete Remove a range of lines entirely. print Display the requested line range with numbering. locate Report line numbers containing the provided search text. patch Apply a unified diff patch to an existing file. substitute Replace occurrences of a pattern with new text using literal or regex matching. stat Display filesystem metadata for the target path. """ parser = _ArgumentParser( prog="filetool", description=description, epilog=epilog, add_help=False, formatter_class=argparse.RawDescriptionHelpFormatter, ) _ensure_builtin_profiles_registered() available_profiles = ", ".join(list_escape_profiles()) parser.add_argument("operation", choices=_ALLOWED_OPERATIONS) parser.add_argument("path", help="Path to the remote file (relative to the server root).") parser.add_argument( "--content", "-c", dest="content", help=( "Inline text content for operations that require it. " "When the payload contains escape sequences such as \\n or \\t and no " "literal newlines, those escapes are expanded automatically so single-line shells can " "submit multi-line data." ), ) parser.add_argument( "--content-from-file", dest="content_file", help="Load content from a local file using the specified encoding.", ) parser.add_argument( "--stdin", dest="read_stdin", action="store_true", help="Read content from standard input until EOF.", ) parser.add_argument( "--escape-profile", default="auto", help=( "Selects the inline escape decoding strategy. Available profiles: " f"{available_profiles or 'auto, none'}." ), ) parser.add_argument( "--pattern", help=( "Pattern text consumed by the substitute operation. Patterns default to literal matching " "but can be interpreted as regular expressions via --regex." ), ) parser.add_argument( "--pattern-from-file", dest="pattern_file", help="Load the substitute pattern from a local file using the specified encoding.", ) parser.add_argument( "--regex", dest="use_regex", action="store_true", default=False, help="Interpret the substitute pattern as a Python regular expression.", ) parser.add_argument( "--ignore-case", dest="ignore_case", action="store_true", default=False, help="Perform case-insensitive matching for the substitute operation.", ) parser.add_argument( "--max-replacements", dest="max_replacements", type=int, help=( "Limit the number of replacements performed by substitute. Omit the option to replace all matches." ), ) parser.add_argument( "--anchor", dest="anchor", help=( "Anchor text used by the insert operation to automatically locate the insertion point." ), ) parser.add_argument( "--anchor-from-file", dest="anchor_file", help="Load anchor text from a local file using the specified encoding.", ) parser.add_argument( "--anchor-regex", dest="anchor_use_regex", action="store_true", default=False, help="Interpret the anchor text as a Python regular expression when inserting.", ) parser.add_argument( "--anchor-ignore-case", dest="anchor_ignore_case", action="store_true", default=False, help="Perform case-insensitive anchor matching (works with literal and regex anchors).", ) parser.add_argument( "--anchor-after", dest="anchor_after", action="store_true", default=False, help="Insert content after the matched anchor line instead of before it.", ) parser.add_argument( "--anchor-occurrence", dest="anchor_occurrence", type=int, help=( "When multiple lines match the anchor, choose which occurrence to target (1-based index)." ), ) parser.add_argument( "--encoding", default="utf-8", help="Text encoding used when reading or writing files.", ) parser.add_argument( "--format", dest="output_format", choices=("human", "json"), default="human", help=( "Select the local rendering style for responses. " "The 'human' format prints descriptive text while 'json' emits machine-readable metadata." ), ) parser.add_argument( "--line", type=int, help="Line number used by the insert operation.", ) parser.add_argument( "--start-line", type=int, help="Start line for replace, delete, or print operations.", ) parser.add_argument( "--end-line", type=int, help="End line for replace, delete, or print operations.", ) parser.add_argument( "--create-parents", action="store_true", help="Create parent directories automatically when writing files.", ) parser.add_argument( "--overwrite", action="store_true", help="Allow the create operation to overwrite an existing file.", ) parser.add_argument( "--no-overwrite", action="store_false", dest="overwrite", help="Prevent create from overwriting existing files (default).", ) parser.add_argument( "--create-if-missing", action="store_true", dest="create_if_missing", default=True, help="Allow append to create files that do not yet exist (default).", ) parser.add_argument( "--no-create-if-missing", action="store_false", dest="create_if_missing", help="Require append to target an existing file.", ) parser.add_argument( "--follow-symlinks", dest="follow_symlinks", action="store_true", default=True, help="Follow symbolic links when inspecting metadata (default).", ) parser.add_argument( "--no-follow-symlinks", dest="follow_symlinks", action="store_false", help="Inspect symbolic link metadata without resolving the target.", ) return parser def render_manage_file_help() -> str: """Return the formatted help text for the ``filetool`` command.""" parser = _create_parser() help_text = parser.format_help().rstrip() if help_text: help_text += "\n" help_text += "Use `filetool --help` to display this message." return help_text def parse_manage_file_command( arguments: Sequence[str], *, stdin: TextIO | None = None, ) -> ManageFileCommand: """Parse raw argument tokens into a :class:`ManageFileCommand`.""" parser = _create_parser() if any(token in {"-h", "--help"} for token in arguments): raise FileCommandHelp(render_manage_file_help()) try: namespace = parser.parse_args(arguments) except FileCommandParseError: raise except SystemExit as exc: # pragma: no cover - argparse fallback safeguard raise FileCommandParseError( f"argument parsing failed with exit status {exc.code}", usage=parser.format_usage(), ) from exc operation = namespace.operation stdin_stream = stdin if stdin is not None else sys.stdin usage = parser.format_usage() escape_profile = _normalize_escape_profile(namespace.escape_profile, usage=usage) namespace.escape_profile = escape_profile content = _resolve_content( namespace, stdin_stream, operation=operation, escape_profile=escape_profile, ) pattern = _resolve_pattern( namespace, encoding=namespace.encoding, operation=operation, ) anchor_text = _resolve_anchor( namespace, encoding=namespace.encoding, operation=operation, usage=usage, ) path = namespace.path encoding = namespace.encoding line = namespace.line start_line = namespace.start_line end_line = namespace.end_line anchor_occurrence = namespace.anchor_occurrence if anchor_text is not None and anchor_occurrence is None: anchor_occurrence = 1 _validate_arguments( operation, line, start_line, end_line, content, pattern, bool(namespace.use_regex), bool(namespace.ignore_case), namespace.max_replacements, anchor_text, bool(namespace.anchor_use_regex), bool(namespace.anchor_ignore_case), bool(namespace.anchor_after), anchor_occurrence, ) if operation == "stat": content = None return ManageFileCommand( operation=operation, path=path, content=content, pattern=pattern, line=line, start_line=start_line, end_line=end_line, encoding=encoding, create_parents=bool(namespace.create_parents), overwrite=bool(namespace.overwrite), create_if_missing=bool(namespace.create_if_missing), escape_profile=escape_profile, follow_symlinks=bool(namespace.follow_symlinks), output_format=str(namespace.output_format), use_regex=bool(namespace.use_regex), ignore_case=bool(namespace.ignore_case), max_replacements=namespace.max_replacements, anchor_text=anchor_text, anchor_use_regex=bool(namespace.anchor_use_regex), anchor_ignore_case=bool(namespace.anchor_ignore_case), anchor_after=bool(namespace.anchor_after), anchor_occurrence=anchor_occurrence, ) def _resolve_content( namespace: argparse.Namespace, stdin_stream: TextIO, *, operation: str, escape_profile: str, ) -> str | None: """Determine the content payload from the parsed namespace.""" sources = [ name for name, enabled in { "content": namespace.content is not None, "content_file": namespace.content_file is not None, "stdin": bool(namespace.read_stdin), }.items() if enabled ] if len(sources) > 1: raise FileCommandParseError( "Only one of --content, --content-from-file, or --stdin may be provided.", usage=_create_parser().format_usage(), ) if not sources: return None if namespace.content is not None: content = namespace.content handler = _ESCAPE_PROFILE_HANDLERS[escape_profile] context = EscapeProfileContext(operation=operation, namespace=namespace) return handler(content, context) if namespace.content_file is not None: path = Path(namespace.content_file).expanduser() try: return path.read_text(encoding=namespace.encoding) except FileNotFoundError as exc: raise FileCommandParseError(f"Content file not found: {path}") from exc except OSError as exc: raise FileCommandParseError(f"Failed to read content file: {path}: {exc}") from exc try: return stdin_stream.read() except OSError as exc: # pragma: no cover - extremely rare raise FileCommandParseError(f"Failed to read stdin: {exc}") from exc def _resolve_pattern( namespace: argparse.Namespace, *, encoding: str, operation: str, ) -> str | None: """Resolve the pattern payload used by the substitute operation.""" sources = [ name for name, enabled in { "pattern": namespace.pattern is not None, "pattern_file": namespace.pattern_file is not None, }.items() if enabled ] if operation != "substitute" and sources: raise FileCommandParseError( "--pattern is only valid for the substitute operation", usage=_create_parser().format_usage(), ) if len(sources) > 1: raise FileCommandParseError( "Only one of --pattern or --pattern-from-file may be provided.", usage=_create_parser().format_usage(), ) if not sources: return None if namespace.pattern_file is not None: path = Path(namespace.pattern_file).expanduser() try: return path.read_text(encoding=encoding) except FileNotFoundError as exc: raise FileCommandParseError(f"Pattern file not found: {path}") from exc except OSError as exc: raise FileCommandParseError(f"Failed to read pattern file: {path}: {exc}") from exc assert namespace.pattern is not None return namespace.pattern def _resolve_anchor( namespace: argparse.Namespace, *, encoding: str, operation: str, usage: str, ) -> str | None: """Resolve anchor text for the insert operation.""" sources = [ name for name, enabled in { "anchor": namespace.anchor is not None, "anchor_file": namespace.anchor_file is not None, }.items() if enabled ] if operation != "insert" and sources: raise FileCommandParseError( "Anchor arguments are only valid for the insert operation", usage=usage, ) if len(sources) > 1: raise FileCommandParseError( "Only one of --anchor or --anchor-from-file may be provided.", usage=usage, ) if not sources: return None if namespace.anchor_file is not None: path = Path(namespace.anchor_file).expanduser() try: text = path.read_text(encoding=encoding) except FileNotFoundError as exc: raise FileCommandParseError(f"Anchor file not found: {path}") from exc except OSError as exc: raise FileCommandParseError(f"Failed to read anchor file: {path}: {exc}") from exc if text.strip() == "": raise FileCommandParseError("Anchor text loaded from file must not be empty") return text assert namespace.anchor is not None if namespace.anchor.strip() == "": raise FileCommandParseError("--anchor requires non-empty text", usage=usage) return namespace.anchor def _validate_arguments( operation: str, line: int | None, start_line: int | None, end_line: int | None, content: str | None, pattern: str | None, use_regex: bool, ignore_case: bool, max_replacements: int | None, anchor_text: str | None, anchor_use_regex: bool, anchor_ignore_case: bool, anchor_after: bool, anchor_occurrence: int | None, ) -> None: """Validate operation-specific requirements.""" if operation == "stat": # Metadata inspections accept dedicated filters that do not overlap with the # traditional line-focused options. Skip the remaining validation logic so # forward-compatible filters can be introduced without tightening checks # here prematurely. return anchor_arguments_used = any( [ anchor_text is not None, anchor_use_regex, anchor_ignore_case, anchor_after, anchor_occurrence is not None, ] ) if anchor_arguments_used and operation != "insert": raise FileCommandParseError( "Anchor options are only valid for the insert operation", ) if anchor_occurrence is not None and anchor_occurrence <= 0: raise FileCommandParseError("--anchor-occurrence must be greater than zero") if line is not None: if line <= 0: raise FileCommandParseError("--line must be a positive integer") if operation != "insert": raise FileCommandParseError("--line is only valid for the insert operation") if operation == "insert": if anchor_text is not None and line is not None: raise FileCommandParseError( "Insert accepts either --line or anchor arguments, not both", ) if anchor_text is None and ( anchor_use_regex or anchor_ignore_case or anchor_after or anchor_occurrence is not None ): raise FileCommandParseError( "Anchor modifiers require --anchor or --anchor-from-file", ) if anchor_text is None and line is None: raise FileCommandParseError( "Insert requires either --line or anchor arguments", ) if start_line is not None: if start_line <= 0: raise FileCommandParseError("--start-line must be a positive integer") if operation not in {"replace", "delete", "print"}: raise FileCommandParseError( "--start-line is only valid for replace, delete, or print operations", ) if end_line is not None: if end_line <= 0: raise FileCommandParseError("--end-line must be a positive integer") if start_line is None: raise FileCommandParseError("--end-line requires --start-line to be set") if end_line < start_line: raise FileCommandParseError("--end-line cannot be less than --start-line") if operation in {"write", "append", "insert"} and content is None: raise FileCommandParseError( f"The {operation} operation requires content via --content, --content-from-file, or --stdin.", ) if operation == "prepend" and content is None: raise FileCommandParseError( "The prepend operation requires content via --content, --content-from-file, or --stdin.", ) if operation == "patch" and content is None: raise FileCommandParseError( "The patch operation requires a unified diff via --content, --content-from-file, or --stdin.", ) if operation == "locate": if content is None or content.strip() == "": raise FileCommandParseError("locate requires non-empty content text") if max_replacements is not None: if max_replacements <= 0: raise FileCommandParseError("--max-replacements must be greater than zero") if operation != "substitute": raise FileCommandParseError( "--max-replacements is only valid for the substitute operation", ) if (use_regex or ignore_case) and operation != "substitute": raise FileCommandParseError( "--regex and --ignore-case are only valid for the substitute operation", ) if operation == "substitute": if pattern is None: raise FileCommandParseError( "substitute requires a search pattern via --pattern or --pattern-from-file", ) if content is None: raise FileCommandParseError( "substitute requires replacement content via --content, --content-from-file, or --stdin.", ) if operation == "replace" and content is None: # Empty string is valid and should replace the range with nothing. return if operation == "create" and content is None: return if operation in {"delete", "print"} and content is not None: raise FileCommandParseError( f"The {operation} operation does not accept content arguments.", )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FreddyE1982/mcp2term'

If you have feedback or need assistance with the MCP directory API, please join our Discord server