Skip to main content
Glama
boecht

BitSight Community MCP Server

by boecht
logs.py16.2 kB
"""Log maintenance commands for BiRRe CLI.""" from __future__ import annotations from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any import typer from rich.console import Console from birre.cli import options as cli_options from birre.cli.invocation import ( LoggingCliInputs, build_invocation, resolve_runtime_and_logging, ) from birre.cli.models import CliInvocation, LogViewLine from birre.cli.runtime import CONTEXT_CHOICES from birre.cli.validation import validate_path_exists from birre.config.constants import DEFAULT_CONFIG_FILENAME from birre.domain.company_rating.constants import DEFAULT_LOG_TAIL_LINES def _rotate_logs(base_path: Path, backup_count: int) -> None: """Rotate log files with numbered backups.""" if backup_count <= 0: base_path.write_text("", encoding="utf-8") return for index in range(backup_count, 0, -1): source = base_path.with_name(f"{base_path.name}.{index}") target = base_path.with_name(f"{base_path.name}.{index + 1}") if validate_path_exists(source): source.replace(target) if validate_path_exists(base_path): base_path.replace(base_path.with_name(f"{base_path.name}.1")) base_path.touch() def _resolve_logging_settings_from_cli( *, config_path: Path | None, log_level: str | None, log_format: str | None, log_file: str | None, log_max_bytes: int | None, log_backup_count: int | None, ) -> tuple[CliInvocation, Any]: """Resolve logging settings from CLI parameters.""" invocation = build_invocation( config_path=str(config_path) if config_path is not None else None, context_choices=CONTEXT_CHOICES, logging=LoggingCliInputs( level=log_level, format=log_format, file_path=log_file, max_bytes=log_max_bytes, backup_count=log_backup_count, ), ) _, logging_settings, _ = resolve_runtime_and_logging(invocation) return invocation, logging_settings def _parse_iso_timestamp_to_epoch(value: str) -> float | None: """Parse ISO 8601 timestamp string to Unix epoch.""" if value is None: return None candidate = value.strip() if not candidate: return None if candidate.endswith("Z"): candidate = candidate[:-1] + "+00:00" try: parsed = datetime.fromisoformat(candidate) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=UTC) else: parsed = parsed.astimezone(UTC) return parsed.timestamp() def _parse_relative_duration(value: str) -> timedelta | None: """Parse relative duration string like '30m', '1h', '2d' to timedelta.""" import re if value is None: return None pattern = re.compile(r"^\s*(\d+)([smhd])\s*$", re.IGNORECASE) match = pattern.match(value) if not match: return None amount = int(match.group(1)) unit = match.group(2).lower() multiplier = { "s": timedelta(seconds=1), "m": timedelta(minutes=1), "h": timedelta(hours=1), "d": timedelta(days=1), }.get(unit) if multiplier is None: return None return multiplier * amount def _parse_json_log_line(stripped: str) -> LogViewLine: """Parse a JSON-formatted log line.""" import json try: data = json.loads(stripped) except json.JSONDecodeError: return LogViewLine(raw=stripped, level=None, timestamp=None, json_data=None) timestamp = None for key in ("timestamp", "time", "@timestamp", "ts"): value = data.get(key) if isinstance(value, str): timestamp = _parse_iso_timestamp_to_epoch(value) if timestamp is not None: break level_value = data.get("level") or data.get("levelname") or data.get("severity") if isinstance(level_value, str): level = cli_options.LOG_LEVEL_MAP.get(level_value.strip().upper()) elif isinstance(level_value, int): level = level_value else: level = None return LogViewLine(raw=stripped, level=level, timestamp=timestamp, json_data=data) def _parse_text_log_line(stripped: str) -> LogViewLine: """Parse a text-formatted log line.""" timestamp = None level = None tokens = stripped.split() if tokens: timestamp = _parse_iso_timestamp_to_epoch(tokens[0].strip("[]")) for token in tokens[:3]: candidate = cli_options.LOG_LEVEL_MAP.get(token.strip("[]:,").upper()) if candidate is not None: level = candidate break return LogViewLine(raw=stripped, level=level, timestamp=timestamp, json_data=None) def _parse_log_line(line: str, format_hint: str) -> LogViewLine: """Parse a log line based on format hint.""" stripped = line.rstrip("\n") if format_hint == "json": return _parse_json_log_line(stripped) return _parse_text_log_line(stripped) def _validate_logs_show_params( tail: int, since: str | None, last: str | None, format_override: str | None, ) -> str | None: """Validate logs_show parameters. Returns normalized format or None.""" if tail < 0: raise typer.BadParameter( "Tail must be greater than or equal to zero.", param_hint="--tail" ) if since and last: raise typer.BadParameter( "Only one of --since or --last can be provided.", param_hint="--since" ) if format_override is not None: normalized = format_override.strip().lower() if normalized not in cli_options.LOG_FORMAT_CHOICES: raise typer.BadParameter( "Format must be either 'text' or 'json'.", param_hint="--format" ) return normalized return None def _resolve_start_timestamp(since: str | None, last: str | None) -> float | None: """Calculate start timestamp from --since or --last options.""" if since: timestamp = _parse_iso_timestamp_to_epoch(since) if timestamp is None: raise typer.BadParameter( "Invalid ISO 8601 timestamp.", param_hint="--since" ) return timestamp if last: duration = _parse_relative_duration(last) if duration is None: raise typer.BadParameter( "Invalid relative duration; use values like 30m, 1h, or 2d.", param_hint="--last", ) return (datetime.now(UTC) - duration).timestamp() return None def _should_include_log_entry( parsed: LogViewLine, level_threshold: int | None, normalized_level: str | None, start_timestamp: float | None, ) -> bool: """Check if log entry passes level and timestamp filters.""" if level_threshold is not None: if parsed.level is not None: if parsed.level < level_threshold: return False elif normalized_level is not None: if normalized_level not in parsed.raw.upper(): return False if start_timestamp is not None: if parsed.timestamp is None or parsed.timestamp < start_timestamp: return False return True def _display_log_entries( matched: list[LogViewLine], resolved_format: str, stdout_console: Console, ) -> None: """Display filtered log entries to stdout.""" if not matched: stdout_console.print( "[yellow]No log entries matched the supplied filters[/yellow]" ) return for entry in matched: if resolved_format == "json" and entry.json_data is not None: stdout_console.print_json(data=entry.json_data) else: stdout_console.print(entry.raw, markup=False, highlight=False) def _cmd_logs_clear( config: Path, log_file: str | None, stdout_console: Console, ) -> None: """Implementation of logs clear command.""" _, logging_settings = _resolve_logging_settings_from_cli( config_path=config, log_level=None, log_format=None, log_file=log_file, log_max_bytes=None, log_backup_count=None, ) file_path = getattr(logging_settings, "file_path", None) if not file_path: stdout_console.print( "[yellow]File logging is disabled; nothing to clear[/yellow]" ) return path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) try: path.write_text("", encoding="utf-8") except OSError as exc: raise typer.BadParameter(f"Failed to clear log file: {exc}") from exc stdout_console.print(f"[green]Log file cleared at[/green] {path}") def _cmd_logs_rotate( config: Path, log_file: str | None, log_backup_count: int | None, stdout_console: Console, ) -> None: """Implementation of logs rotate command.""" _, logging_settings = _resolve_logging_settings_from_cli( config_path=config, log_level=None, log_format=None, log_file=log_file, log_max_bytes=None, log_backup_count=log_backup_count, ) file_path = getattr(logging_settings, "file_path", None) if not file_path: stdout_console.print( "[yellow]File logging is disabled; nothing to rotate[/yellow]" ) return path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) backup_count = ( log_backup_count if log_backup_count is not None else getattr(logging_settings, "backup_count", 0) or 0 ) _rotate_logs(path, backup_count) stdout_console.print(f"[green]Log files rotated at[/green] {path}") def _cmd_logs_path( config: Path, log_file: str | None, stdout_console: Console, ) -> None: """Implementation of logs path command.""" _, logging_settings = _resolve_logging_settings_from_cli( config_path=config, log_level=None, log_format=None, log_file=log_file, log_max_bytes=None, log_backup_count=None, ) file_path = getattr(logging_settings, "file_path", None) if not file_path: stdout_console.print("[yellow]File logging is disabled[/yellow]") return resolved = Path(file_path) absolute = resolved.expanduser() try: absolute = absolute.resolve(strict=False) except OSError: absolute = absolute.absolute() stdout_console.print(f"[green]Log file (relative)[/green]: {resolved}") stdout_console.print(f"[green]Log file (absolute)[/green]: {absolute}") def _cmd_logs_show( config: Path, log_file: str | None, level: str | None, tail: int, since: str | None, last: str | None, format_override: str | None, stdout_console: Console, ) -> None: """Implementation of logs show command.""" normalized_format = _validate_logs_show_params(tail, since, last, format_override) normalized_level = ( cli_options.normalize_log_level(level) if level is not None else None ) level_threshold = ( cli_options.LOG_LEVEL_MAP.get(normalized_level) if normalized_level else None ) _, logging_settings = _resolve_logging_settings_from_cli( config_path=config, log_level=None, log_format=None, log_file=log_file, log_max_bytes=None, log_backup_count=None, ) file_path = getattr(logging_settings, "file_path", None) resolved_format = ( normalized_format or getattr(logging_settings, "format", None) or "text" ) if not file_path: stdout_console.print( "[yellow]File logging is disabled; nothing to show[/yellow]" ) return path = Path(file_path) if not validate_path_exists(path): stdout_console.print(f"[yellow]Log file not found at[/yellow] {path}") return start_timestamp = _resolve_start_timestamp(since, last) matched: list[LogViewLine] = [] with path.open("r", encoding="utf-8", errors="replace") as handle: for line in handle: parsed = _parse_log_line(line, resolved_format) if _should_include_log_entry( parsed, level_threshold, normalized_level, start_timestamp, ): matched.append(parsed) if tail and tail > 0: matched = matched[-tail:] _display_log_entries(matched, resolved_format, stdout_console) def register( app: typer.Typer, *, stdout_console: Console, ) -> None: """Register logs commands with the app.""" # Logs subcommands group logs_app = typer.Typer( help="Inspect and maintain BiRRe log files.", invoke_without_command=True, no_args_is_help=True, ) app.add_typer(logs_app, name="logs") @logs_app.callback(invoke_without_command=True) def logs_group_callback(ctx: typer.Context) -> None: """Display help when logs group is invoked without a subcommand.""" if ctx.invoked_subcommand is None: typer.echo(ctx.get_help()) raise typer.Exit() @logs_app.command( "clear", help="Truncate the active BiRRe log file while leaving rotated archives untouched.", ) def logs_clear( config: cli_options.ConfigPathOption = Path(DEFAULT_CONFIG_FILENAME), log_file: cli_options.LogFileOption = None, ) -> None: """Truncate the resolved log file.""" _cmd_logs_clear(config, log_file, stdout_console) @logs_app.command( "rotate", help="Perform a manual log rotation using the configured backup count.", ) def logs_rotate( config: cli_options.ConfigPathOption = Path(DEFAULT_CONFIG_FILENAME), log_file: cli_options.LogFileOption = None, log_backup_count: cli_options.LogBackupCountOption = None, ) -> None: """Rotate the active log file into numbered archives.""" _cmd_logs_rotate(config, log_file, log_backup_count, stdout_console) @logs_app.command( "path", help="Show the resolved BiRRe log file path after applying configuration overrides.", ) def logs_path( config: cli_options.ConfigPathOption = Path(DEFAULT_CONFIG_FILENAME), log_file: cli_options.LogFileOption = None, ) -> None: """Print the effective log file path.""" _cmd_logs_path(config, log_file, stdout_console) @logs_app.command( "show", help="Display recent log entries with optional level and time filtering.", ) def logs_show( config: cli_options.ConfigPathOption = Path(DEFAULT_CONFIG_FILENAME), log_file: cli_options.LogFileOption = None, level: str | None = typer.Option( None, "--level", help="Minimum log level to include (e.g. INFO, WARNING).", rich_help_panel="Filtering", ), tail: int = typer.Option( DEFAULT_LOG_TAIL_LINES, "--tail", help="Number of lines from the end of the log to display (0 to show all).", rich_help_panel="Filtering", ), since: str | None = typer.Option( None, "--since", help="Only include entries at or after the given ISO 8601 timestamp.", rich_help_panel="Filtering", ), last: str | None = typer.Option( None, "--last", help="Only include entries from the relative window (e.g. 1h, 30m).", rich_help_panel="Filtering", ), format_override: str | None = typer.Option( None, "--format", case_sensitive=False, help="Treat log entries as 'json' or 'text'. Defaults to the configured format.", rich_help_panel="Presentation", ), ) -> None: """Render log entries to stdout.""" _cmd_logs_show( config, log_file, level, tail, since, last, format_override, stdout_console, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boecht/bitsight-community-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server