Skip to main content
Glama
WatchlistManager.py12.7 kB
# Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev> # This file is part of Archive Agent. See LICENSE for details. import typer import os from pathlib import Path from typing import Dict, Any from archive_agent.core.CliManager import CliManager from archive_agent.data.FileData import FileData from archive_agent.util.StorageManager import StorageManager from archive_agent.util.format import format_file from archive_agent.watchlist.pattern import validate_pattern, resolve_pattern from archive_agent.core.lock import file_lock TrackedFiles = Dict[str, Dict[str, Any]] # {file_path: file_meta} class WatchlistManager(StorageManager): """ Watchlist manager. """ WATCHLIST_VERSION = 'watchlist_version' DEFAULT_WATCHLIST = { WATCHLIST_VERSION: 2, 'included': [], 'excluded': [], 'tracked': {}, } DIFF_NONE = 'None' DIFF_ADDED = 'added' DIFF_REMOVED = 'removed' DIFF_CHANGED = 'changed' DIFF_OPTIONS = [DIFF_NONE, DIFF_ADDED, DIFF_REMOVED, DIFF_CHANGED] def __init__(self, cli: CliManager, settings_path: Path, profile_name: str) -> None: """ Initialize watchlist manager. :param cli: CLI manager. :param settings_path: Settings path. :param profile_name: Profile name. """ self.cli = cli file_path = settings_path / profile_name / "watchlist.json" StorageManager.__init__(self, logger=self.cli.logger, file_path=file_path, default=self.DEFAULT_WATCHLIST) def upgrade(self) -> bool: """ Upgrade data. :return: True if data upgraded, False otherwise. """ upgraded = False version = self.data.get(self.WATCHLIST_VERSION, 1) if version < 2: self.cli.logger.warning(f"Upgrading watchlist (v2): {format_file(self.file_path)}") self.data[self.WATCHLIST_VERSION] = 2 upgraded = True return upgraded def validate(self) -> bool: """ Validate data. :return: True if data is valid, False otherwise. """ if set(self.data['included']) & set(self.data['excluded']): self.cli.logger.error("Overlapping included and excluded patterns") return False if any(file_meta['diff'] not in self.DIFF_OPTIONS for file_meta in self.data['tracked'].values()): self.cli.logger.error("Invalid diff option encountered") return False return True @file_lock("archive_agent_watchlist") def include(self, pattern) -> None: """ Add included pattern. :param pattern: Pattern. """ pattern = validate_pattern(pattern) if pattern in self.data['included']: self.cli.logger.info(f"Already included pattern:") self.cli.logger.info(f"- {pattern}") elif pattern in self.data['excluded']: self.cli.logger.info(f"Included previously excluded pattern:") self.cli.logger.info(f"- {pattern}") self.data['excluded'].remove(pattern) self.data['included'] = list(set(self.data['included']) | {pattern}) self.save() else: self.cli.logger.info(f"New included pattern:") self.cli.logger.info(f"- {pattern}") self.data['included'] = list(set(self.data['included']) | {pattern}) self.save() @file_lock("archive_agent_watchlist") def exclude(self, pattern) -> None: """ Add excluded pattern. :param pattern: Pattern. """ pattern = validate_pattern(pattern) if pattern in self.data['excluded']: self.cli.logger.info(f"Already excluded pattern:") self.cli.logger.info(f"- {pattern}") elif pattern in self.data['included']: self.cli.logger.info(f"Excluded previously included pattern:") self.cli.logger.info(f"- {pattern}") self.data['included'].remove(pattern) self.data['excluded'] = list(set(self.data['excluded']) | {pattern}) self.save() else: self.cli.logger.info(f"New excluded pattern:") self.cli.logger.info(f"- {pattern}") self.data['excluded'] = list(set(self.data['excluded']) | {pattern}) self.save() @file_lock("archive_agent_watchlist") def remove(self, pattern) -> None: """ Remove previously included / excluded pattern. :param pattern: Pattern. """ pattern = validate_pattern(pattern) if pattern in self.data['included']: self.cli.logger.info(f"Removed included pattern:") self.cli.logger.info(f"- {pattern}") self.data['included'].remove(pattern) self.save() elif pattern in self.data['excluded']: self.cli.logger.info(f"Removed excluded pattern:") self.cli.logger.info(f"- {pattern}") self.data['excluded'].remove(pattern) self.save() else: self.cli.logger.warning(f"No existing rule for pattern:") self.cli.logger.info(f"{pattern}") @file_lock("archive_agent_watchlist") def patterns(self) -> None: """ Show the list of included / excluded patterns. """ if len(self.data['included']) > 0: self.cli.logger.info(f"({len(self.data['included'])}) included pattern(s):") for included_pattern in self.data['included']: self.cli.logger.info(f"- {included_pattern}") else: self.cli.logger.info("(0) included pattern(s)") if len(self.data['excluded']) > 0: self.cli.logger.info(f"({len(self.data['excluded'])}) excluded pattern(s):") for excluded_pattern in self.data['excluded']: self.cli.logger.info(f"- {excluded_pattern}") else: self.cli.logger.info("(0) excluded pattern(s)") def get_included_patterns(self) -> list[str]: """ Get the list of included patterns. :return: List of included patterns. """ return self.data['included'] def get_excluded_patterns(self) -> list[str]: """ Get the list of excluded patterns. :return: List of excluded patterns. """ return self.data['excluded'] @file_lock("archive_agent_watchlist") def track(self) -> int: """ Resolve all patterns and track changed files. :return: Total number of added, removed, changes files. """ self.cli.logger.info(f"Resolving ({len(self.data['included'])}) included / ({len(self.data['excluded'])}) excluded pattern(s):") included_files = [] for included_pattern in self.data['included']: included_files += resolve_pattern(included_pattern) included_files = list(set(included_files)) self.cli.logger.info(f"- Matched ({len(included_files)}) unique included file(s)") excluded_files = [] for excluded_pattern in self.data['excluded']: excluded_files += resolve_pattern(excluded_pattern) excluded_files = list(set(excluded_files)) self.cli.logger.info(f"- Matched ({len(excluded_files)}) unique excluded file(s)") tracked_files_old = self.data['tracked'].keys() tracked_files_new = sorted([file for file in included_files if file not in excluded_files]) self.cli.logger.info(f"- Ignoring ({len(included_files) - len(tracked_files_new)}) file(s)") self.cli.logger.info(f"Tracking ({len(tracked_files_new)}) file(s):") added_files = [file for file in tracked_files_new if file not in tracked_files_old] removed_files = [file for file in tracked_files_old if file not in tracked_files_new] possibly_changed_files = [file for file in tracked_files_new if file not in added_files + removed_files] tracked_dict_old = self.data['tracked'] tracked_dict_new = { file: { 'size': os.path.getsize(file), 'mtime': os.path.getmtime(file), 'diff': self.DIFF_NONE, } for file in tracked_files_new } changed_files = [file for file in possibly_changed_files if tracked_dict_new[file] != tracked_dict_old[file]] for file in added_files: tracked_dict_new[file]['diff'] = self.DIFF_ADDED for file in removed_files: tracked_dict_new[file] = { 'size': 0, 'mtime': 0, 'diff': self.DIFF_REMOVED, } for file in changed_files: tracked_dict_new[file]['diff'] = self.DIFF_CHANGED unchanged_count = len(tracked_files_new) - len(added_files) - len(changed_files) self.cli.logger.info(f"- ({len(added_files)}) added file(s)") self.cli.logger.info(f"- ({len(removed_files)}) removed file(s)") self.cli.logger.info(f"- ({len(changed_files)}) changed file(s)") self.cli.logger.info(f"- ({unchanged_count}) unchanged file(s)") self.data['tracked'] = tracked_dict_new self.save() return len(added_files) + len(removed_files) + len(changed_files) def get_tracked_files(self) -> TrackedFiles: """ Get the list of tracked files. :return: List of tracked files. """ return self.data['tracked'] @file_lock("archive_agent_watchlist") def list(self) -> None: """ Show the list of tracked files. """ if len(self.data['tracked']) > 0: self.cli.logger.info(f"({len(self.data['tracked'])}) tracked file(s):") for file in self.data['tracked'].keys(): self.cli.logger.info(f"- {file}") else: self.cli.logger.info("(0) tracked file(s)") def get_diff_files(self, diff_option: str) -> TrackedFiles: """ Get the list of tracked files filtered by diff option. :param diff_option: Diff option. :return: List of tracked files filtered by diff option. """ return { file_path: file_meta for file_path, file_meta in self.data['tracked'].items() if file_meta['diff'] == diff_option } @file_lock("archive_agent_watchlist") def diff(self) -> None: """ Show the list of changed files. """ added_files = self.get_diff_files(self.DIFF_ADDED) changed_files = self.get_diff_files(self.DIFF_CHANGED) removed_files = self.get_diff_files(self.DIFF_REMOVED) if len(added_files) > 0: self.cli.logger.info(f"({len(added_files)}) added file(s):") for file in added_files.keys(): self.cli.logger.info(f"- ADDED {file}") else: self.cli.logger.info("(0) added file(s)") if len(changed_files) > 0: self.cli.logger.info(f"({len(changed_files)}) changed file(s):") for file in changed_files.keys(): self.cli.logger.info(f"- CHANGED {file}") else: self.cli.logger.info("(0) changed file(s)") if len(removed_files) > 0: self.cli.logger.info(f"({len(removed_files)}) removed file(s):") for file in removed_files.keys(): self.cli.logger.info(f"- REMOVED {file}") else: self.cli.logger.info("(0) removed file(s)") def diff_mark_resolved(self, file_data: FileData) -> None: """ Mark file in diff as resolved. If the file was deleted, untrack it completely. :param file_data: File data. """ if file_data.file_path not in self.data['tracked']: self.cli.logger.error(f"Untracked {format_file(file_data.file_path)}") raise typer.Exit(code=1) if self.data['tracked'][file_data.file_path]['diff'] == self.DIFF_NONE: self.cli.logger.error(f"Already marked as resolved: {format_file(file_data.file_path)}") raise typer.Exit(code=1) if self.data['tracked'][file_data.file_path]['diff'] == self.DIFF_REMOVED: del self.data['tracked'][file_data.file_path] else: self.data['tracked'][file_data.file_path]['diff'] = self.DIFF_NONE self.save() def isEmpty(self) -> bool: """ Check for blank watchlist. :return: True if no patterns are included or excluded, False otherwise. """ return len(self.data['included']) == 0 and len(self.data['excluded']) == 0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shredEngineer/Archive-Agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server