Skip to main content
Glama
CliManager.py18.6 kB
# TODO: Implement prefixes for Panels, similar to `get_prefixed_logger()`. # archive_agent/core/CliManager.py # Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev> # This file is part of Archive Agent. See LICENSE for details. import json import logging from logging import Logger import queue import threading from contextlib import contextmanager from logging import Handler, LogRecord from typing import Callable, List, Dict, cast, Optional, Any import typer from rich.console import Console, RenderableType, Group from rich.live import Live from rich.logging import RichHandler from rich.panel import Panel from rich.pretty import Pretty from rich.table import Table from qdrant_client.models import ScoredPoint from archive_agent.ai.AiResult import AiResult from archive_agent.ai.query.AiQuery import QuerySchema from archive_agent.ai.vision.AiVisionSchema import VisionSchema from archive_agent.util.PrefixedLogger import PrefixedLogger from archive_agent.util.format import format_chunk_brief, get_point_reference_info from archive_agent.db.QdrantSchema import parse_payload class QueueHandler(Handler): """ A logging handler that puts records into a queue. """ def __init__(self, record_queue: queue.Queue): """ Initialize the handler. :param record_queue: The queue to put records into. """ super().__init__() self.record_queue = record_queue def emit(self, record: LogRecord): """ Emit a record. """ self.record_queue.put(record) def _printer_thread_target( live: Live, q: queue.Queue, rich_handler: RichHandler, get_renderable: Callable[[], RenderableType] ): """ The target function for the printer thread. Pulls items from a queue and prints them to the live console. Also, periodically refreshes the live display. """ logger = logging.getLogger("ai.cli.printer") while True: try: item = q.get(timeout=0.1) # Timeout to allow for periodic refresh if item is None: # Sentinel value to signal thread to exit break if isinstance(item, LogRecord): rich_handler.handle(item) else: try: live.console.print(item) except Exception as e: logger.error(f"Failed to render output: {type(e).__name__}: {e}") q.task_done() except queue.Empty: pass # Timeout occurred, just refresh the display except Exception as e: logger.error(f"Printer thread error: {type(e).__name__}: {e}") try: live.update(get_renderable()) except Exception as e: logger.error(f"Live update error: {type(e).__name__}: {e}") class CliManager: """ CLI manager. """ VERBOSE_LOADER: bool = False # enabled by --verbose flag VERBOSE_CHUNK: bool = False # enabled by --verbose flag VERBOSE_RERANK: bool = False # enabled by --verbose flag VERBOSE_EMBED: bool = False # enabled by --verbose flag VERBOSE_QUERY: bool = False # enabled by --verbose flag VERBOSE_VISION: bool = False # enabled by --verbose flag VERBOSE_RETRIEVAL: bool = False # enabled by --verbose flag VERBOSE_USAGE: bool = False # enabled by --verbose flag def __init__(self, verbose: bool = False): """ Initialize CLI manager. :param verbose: Verbosity switch. """ CliManager.VERBOSE_LOADER = verbose CliManager.VERBOSE_CHUNK = verbose CliManager.VERBOSE_RERANK = verbose CliManager.VERBOSE_EMBED = verbose CliManager.VERBOSE_QUERY = verbose CliManager.VERBOSE_VISION = verbose CliManager.VERBOSE_RETRIEVAL = verbose CliManager.VERBOSE_USAGE = verbose self.logger = logging.getLogger() self.console = Console(markup=False) self._output_queue: Optional[queue.Queue] = None self.ai_usage_stats = { "chunk": 0, "embed": 0, "rerank": 0, "query": 0, "vision": 0 } self.lock = threading.Lock() def get_prefixed_logger(self, prefix: str) -> Logger: """ Get prefixed logger. :param prefix: Prefix. :return logger: Logger. """ return PrefixedLogger(prefix, self.logger) @contextmanager def live_context(self, live: Live, get_renderable: Callable[[], RenderableType]) -> Any: """ A context manager to correctly handle logging and printing during a Live display. It finds the existing RichHandler, redirects its output to the live console, and funnels all log messages through a queue to a dedicated printer thread. """ q = queue.Queue() self._output_queue = q # Find the existing RichHandler from the configured logger. original_rich_handler = None for handler in self.logger.handlers: if isinstance(handler, RichHandler): original_rich_handler = handler break if not original_rich_handler: raise RuntimeError("RichHandler not found on the logger. Logging is not configured correctly.") # Store the original console and temporarily redirect the handler's output to the live console. original_console = original_rich_handler.console original_rich_handler.console = live.console # Start the printer thread, passing it the *original*, now-redirected handler. printer_thread = threading.Thread( target=_printer_thread_target, args=(live, q, original_rich_handler, get_renderable) ) printer_thread.daemon = True printer_thread.start() # Temporarily replace the main logger's handlers with the QueueHandler to funnel all logs. queue_handler = QueueHandler(q) original_handlers = self.logger.handlers[:] self.logger.handlers = [queue_handler] try: yield finally: # Signal the printer thread to exit and wait for it to finish processing the queue. q.put(None) printer_thread.join() # Restore the original logger handlers. self.logger.handlers = original_handlers self._output_queue = None # IMPORTANT: Restore the original console to the handler. if original_rich_handler: original_rich_handler.console = original_console def _print(self, renderable: RenderableType): """ Internal helper to print to the correct destination (queue or default console). """ if self._output_queue: self._output_queue.put(renderable) else: self.console.print(renderable) def update_ai_usage(self, stats: Dict[str, int]): """ Thread-safe method to update AI usage statistics. :param stats: A dictionary with token counts to add. """ with self.lock: for category, value in stats.items(): if category in self.ai_usage_stats: self.ai_usage_stats[category] += value def get_ai_usage_renderable(self) -> Table: """ Create a Rich Table to display AI usage statistics in a transposed layout. :return: A Rich Table object. """ table = Table( title="AI Usage", show_header=False, show_edge=False, pad_edge=False, box=None, padding=(0, 2) ) with self.lock: categories = [c.capitalize() for c in self.ai_usage_stats.keys()] values = [str(v) for v in self.ai_usage_stats.values()] # Define columns based on number of categories + 1 for the labels table.add_column(style="cyan", no_wrap=True) for _ in categories: table.add_column(justify="right", style="magenta") table.add_row("Category", *categories) table.add_row("Tokens", *values) return table @contextmanager def progress_context(self, progress_manager) -> Any: """ A context manager for displaying hierarchical progress with live logging. Uses the provided ProgressManager from ContextManager. :param progress_manager: ProgressManager instance from ContextManager. """ def get_renderable() -> Group: return Group( progress_manager.get_tree_renderable(), self.get_ai_usage_renderable() ) with Live(get_renderable(), screen=False, redirect_stderr=False, transient=True) as live: with self.live_context(live, get_renderable): yield progress_manager, None # yield ProgressManager directly def format_json(self, text: str) -> None: """ Format text as JSON. :param text: Text. """ try: data = json.loads(text) pretty = Pretty(data, expand_all=True) self._print(Panel(pretty, title="Structured output", style="blue", border_style="blue")) except json.JSONDecodeError: self._print(Panel(f"{text}", title="Raw output", style="red", border_style="red")) def prompt(self, message: str, is_cmd: bool, **kwargs) -> str: """ Prompt user with message. :param message: Message. :param is_cmd: Enables "> " command style prompt. :param kwargs: Additional arguments for typer.prompt. :return: User input. """ if is_cmd: self.logger.info(f"⚡ Archive Agent: {message}") return typer.prompt("", prompt_suffix="> ", **kwargs) else: self.logger.info(f"⚡ Archive Agent") return typer.prompt(message, prompt_suffix="", **kwargs) def format_ai_chunk( self, callback: Callable[[], AiResult], line_numbered_text: str, ) -> AiResult: """ Format text to be chunked and AI result of chunk callback. :param callback: Chunk callback returning AI result. :param line_numbered_text: Text with line numbers. :return: AI result. """ if CliManager.VERBOSE_CHUNK: self._print(Panel(f"{line_numbered_text}", title="Text", style="blue", border_style="blue")) self.logger.info("✨ Awaiting AI chunking…") result: AiResult = callback() self.update_ai_usage({"chunk": result.total_tokens}) if CliManager.VERBOSE_USAGE: self.logger.info(f"Used ({result.total_tokens}) AI API token(s) for chunking") if CliManager.VERBOSE_CHUNK: self.format_json(result.output_text) return result def format_ai_rerank( self, callback: Callable[[], AiResult], indexed_chunks: Dict[int, str], ) -> AiResult: """ Format chunks to be reranked and AI result of rerank callback. :param callback: Rerank callback returning AI result. :param indexed_chunks: Indexed chunks. :return: AI result. """ if CliManager.VERBOSE_RERANK: indexed_chunks_str = "\n".join([ f"{index:>3} : {format_chunk_brief(chunk=chunk)}" for index, chunk in indexed_chunks.items() ]) self._print(Panel(f"{indexed_chunks_str}", title="Indexed Chunks", style="blue", border_style="blue")) self.logger.info("✨ Awaiting AI reranking…") result: AiResult = callback() self.update_ai_usage({"rerank": result.total_tokens}) if CliManager.VERBOSE_USAGE: self.logger.info(f"Used ({result.total_tokens}) AI API token(s) for reranking") if CliManager.VERBOSE_RERANK: self.format_json(result.output_text) return result def format_ai_embed( self, callback: Callable[[], AiResult], text: str, ) -> AiResult: """ Format text to be embedded. :param callback: Embed callback returning AI result. :param text: Text. :return: AI result. """ if CliManager.VERBOSE_EMBED: self.format_chunk(text) self.logger.info("✨ Awaiting AI embedding…") result: AiResult = callback() self.update_ai_usage({"embed": result.total_tokens}) if CliManager.VERBOSE_USAGE: self.logger.info(f"Used ({result.total_tokens}) AI API token(s) for embedding") return result def format_ai_query( self, callback: Callable[[], AiResult], prompt: str, ) -> AiResult: """ Format prompt and AI result of query callback. :param callback: Query callback returning AI result. :param prompt: Prompt. :return: AI result. """ if CliManager.VERBOSE_QUERY: self._print(Panel(f"{prompt}", title="Query", style="magenta", border_style="magenta")) self.logger.info("✨ Awaiting AI response…") result: AiResult = callback() self.update_ai_usage({"query": result.total_tokens}) if CliManager.VERBOSE_USAGE: self.logger.info(f"Used ({result.total_tokens}) AI API token(s) for query") if CliManager.VERBOSE_QUERY: self.format_json(result.output_text) return result def format_ai_vision( self, callback: Callable[[], AiResult], ) -> AiResult: """ Format AI result of vision callback. :param callback: Vision callback returning AI result. :return: AI result. """ if CliManager.VERBOSE_VISION: self.logger.info("✨ Awaiting AI vision…") result: AiResult = callback() self.update_ai_usage({"vision": result.total_tokens}) if CliManager.VERBOSE_VISION: vision_result = cast(VisionSchema, result.parsed_schema) if vision_result.is_rejected: self._print( Panel( f"{vision_result.rejection_reason}", title="Vision rejected", style="red", border_style="red" ) ) else: self.format_json(result.output_text) if CliManager.VERBOSE_USAGE: self.logger.info(f"Used ({result.total_tokens}) AI API token(s) for vision") return result def format_point(self, point: ScoredPoint) -> None: """ Format point. :param point: Point. """ self.logger.info(f"({point.score * 100:>6.2f} %) match: {get_point_reference_info(self.logger, point, verbose=True)}") def format_retrieved_points(self, points: List[ScoredPoint]) -> None: """ Format chunks of retrieved points. :param points: Retrieved points. """ if len(points) == 0: self.logger.info(f"⚠️ No retrieved results") return self.logger.info(f"✅ Retrieved ({len(points)}) chunk(s):") for point in points: assert point.payload is not None self.format_point(point) if CliManager.VERBOSE_RETRIEVAL: self.format_chunk(parse_payload(point.payload).chunk_text) def format_reranked_points(self, points: List[ScoredPoint]) -> None: """ Format chunks of reranked points. :param points: Reranked points. """ if len(points) == 0: self.logger.info(f"⚠️ No reranked results") return self.logger.info(f"✅ Reranked and limited down to ({len(points)}) chunk(s):") for point in points: assert point.payload is not None self.format_point(point) if CliManager.VERBOSE_RERANK: self.format_chunk(parse_payload(point.payload).chunk_text) def format_expanded_deduped_points(self, points: List[ScoredPoint]) -> None: """ Format chunks of expanded and deduplicated points. :param points: Expanded and deduplicated points. """ if len(points) == 0: self.logger.info(f"⚠️ No expanded and deduplicated results") return self.logger.info(f"✅ Expanded and deduplicated down to ({len(points)}) chunk(s):") for point in points: assert point.payload is not None self.format_point(point) if CliManager.VERBOSE_RERANK: self.format_chunk(parse_payload(point.payload).chunk_text) def format_chunk(self, chunk: str) -> None: """ Format chunk. :param chunk: Chunk. """ self._print(Panel(f"{chunk}", title="Chunk", style="orange3", border_style="orange3")) def format_question(self, question: str) -> None: """ Format question. :param question: Question. """ self._print(Panel(f"{question}", title="Question", style="magenta", border_style="magenta")) def format_query(self, query_result: QuerySchema, answer_text: str): """ Format query. :param query_result: Query result. :param answer_text: Formatted answer. """ if query_result.is_rejected: self._print( Panel(f"{query_result.rejection_reason}", title="Query rejected", style="red", border_style="red") ) else: self._print(Panel(f"{answer_text}", title="Answer", style="green", border_style="green")) def usage(self): """ Show AI token usage. """ with self.lock: if any(self.ai_usage_stats.values()): self.logger.info( f"Used AI API token(s): " f"({self.ai_usage_stats['chunk']}) chunking, " f"({self.ai_usage_stats['embed']}) embedding, " f"({self.ai_usage_stats['rerank']}) reranking, " f"({self.ai_usage_stats['query']}) query, " f"({self.ai_usage_stats['vision']}) vision" ) else: self.logger.info("No AI API tokens used") self.console.print(self.get_ai_usage_renderable())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shredEngineer/Archive-Agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server