Skip to main content
Glama
progress.ex11.1 kB
defmodule HexdocsMcp.CLI.Progress do @moduledoc """ Utilities for displaying progress indicators in command-line interfaces. """ # Import ANSI colors and formatting functions import IO.ANSI, only: [ green: 0, yellow: 0, reset: 0, bright: 0, cyan: 0, red: 0 ] # Spinner animation frames @spinner_frames ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] @doc ~S""" Displays a message, executes a function, and updates the message upon completion. ## Options * `:success_message` - Text appended after the original message on success (default: "[✓]") * `:failure_message` - Text appended after the original message on failure (default: "[✗]") ## Examples HexdocsMcp.CLI.Progress.with_spinner("Processing files", fn -> :timer.sleep(2000) {:ok, "Done!"} end) # Output initially: Processing files... # Output finally: Processing files [✓] HexdocsMcp.CLI.Progress.with_spinner("Downloading", fn -> :timer.sleep(1000) raise "Network Error" end, failure_message: "[Failed]") # Output initially: Downloading... # Output finally: Downloading [Failed] """ def with_spinner(message, func, opts \\ []) do success_indicator = Keyword.get(opts, :success_message, "#{green()}✓#{reset()}") failure_indicator = Keyword.get(opts, :failure_message, "[#{red()}✗#{reset()}]") initial_message = "#{bright()}#{message}...#{reset()}" # Print initial message without newline IO.write(initial_message) try do result = func.() # Clear line and print success message clear_line = "\r\e[2K" IO.write(clear_line <> "#{success_indicator} #{bright()}#{message}#{reset()}\n") result catch kind, reason -> # Clear line and print failure message clear_line = "\r\e[2K" IO.write(clear_line <> "#{failure_indicator} #{bright()}#{message}#{reset()}\n") # Re-raise the original error :erlang.raise(kind, reason, __STACKTRACE__) end end @doc """ Displays a progress bar for a given total count. Returns a function that should be called with the current count. ## Examples progress = HexdocsMcp.CLI.Progress.progress_bar("Copying files", 100) Enum.each(1..100, fn i -> :timer.sleep(50) # Do some work progress.(i) end) """ def progress_bar(message, total) do # Store the last update time in process dictionary # to limit update rate Process.put(:last_progress_update, 0) # Store the last percentage to avoid redrawing when not needed Process.put(:last_progress_percentage, -1) # Store the last displayed count to maintain consistency Process.put(:last_displayed_count, 0) # Bar configuration # Characters for the bar itself bar_length = 20 # Store the message and total for this progress bar so we can track across multiple calls Process.put(:current_progress_message, message) Process.put(:current_progress_total, total) # Simple progress bar that avoids flashing by limiting updates fn count -> now = System.monotonic_time(:millisecond) last_update = Process.get(:last_progress_update, 0) # Get the stored total - this handles the case where different batches # are processed as separate tasks but are part of the same operation current_message = Process.get(:current_progress_message) if current_message != message do # If message changed, this is a new progress bar, reset state Process.put(:last_progress_percentage, -1) Process.put(:last_displayed_count, 0) Process.put(:current_progress_message, message) Process.put(:current_progress_total, total) end # Use the stored total so that if we're called multiple times for batches # we maintain the same total stored_total = Process.get(:current_progress_total, total) safe_total = max(1, stored_total) # Ensure count never decreases - this prevents "jumping backward" last_displayed_count = Process.get(:last_displayed_count, 0) display_count = max(0, min(safe_total, max(count, last_displayed_count))) # Calculate percentage (clamped between a 0-100) percentage = min(100, max(0, trunc(display_count / safe_total * 100))) last_percentage = Process.get(:last_progress_percentage, -1) # Only update if: # 1. It's been at least 250ms since last update, and count increased, or # 2. This is the final update (100%), or # 3. Percentage changed significantly (e.g., by 5%) # Increased interval to reduce noise update_interval_ms = 250 # Update only on larger percentage jumps percentage_change_threshold = 5 if count >= total || percentage >= 100 || (now - last_update >= update_interval_ms && count > last_displayed_count) || abs(percentage - last_percentage) >= percentage_change_threshold do # Calculate the number of bar segments to fill, ensuring values are in range filled_length = max(0, min(bar_length, trunc(bar_length * display_count / total))) # Ensure we never have negative unfilled length unfilled_length = max(0, bar_length - filled_length) # Create the bar bar = String.duplicate("█", filled_length) <> String.duplicate("░", unfilled_length) # Format the message and numbers with colors percent_str = String.pad_leading("#{percentage}%", 4) count_str = String.pad_leading("#{display_count}", String.length("#{total}")) # Enhanced progress bar with color and visual elements # Use \r\e[2K to clear the line before writing to prevent flicker IO.write( # Clear line first "\r\e[2K" <> "#{bright()}#{message}#{reset()}: " <> "#{cyan()}#{bar}#{reset()} " <> "#{yellow()}#{percent_str}#{reset()} " <> "(#{bright()}#{count_str}#{reset()}/#{total})" # Removed extra padding as clear line handles overwriting ) # If we're done, add a completion message on a new line write_completion_message(count, total, message) # Store the last update time, percentage and count Process.put(:last_progress_update, now) Process.put(:last_progress_percentage, percentage) Process.put(:last_displayed_count, display_count) end count end end @doc """ Creates a single-line workflow with animated spinner and checkmarks. Returns a tuple of two functions: - start_stage(stage_name) - Start a new stage with animated spinner - complete_workflow() - Complete the workflow with final checkmark ## Example {next_stage, complete} = HexdocsMcp.CLI.Progress.workflow(["Fetching", "Converting", "Processing"]) next_stage.("Fetching") # do fetching work... next_stage.("Converting") # do conversion work... next_stage.("Processing") # do processing work... complete.() """ def workflow(stages) do # Store state in process dictionary Process.put(:workflow_stages, stages) Process.put(:workflow_completed_stages, []) Process.put(:workflow_current_stage, nil) Process.put(:workflow_spinner_pid, nil) # Function to start a new stage start_stage = fn stage_name -> # Stop current spinner if it exists if pid = Process.get(:workflow_spinner_pid) do Process.exit(pid, :normal) Process.put(:workflow_spinner_pid, nil) end # Mark previous stage as completed if exists if current = Process.get(:workflow_current_stage) do completed = Process.get(:workflow_completed_stages, []) Process.put(:workflow_completed_stages, [current | completed]) end # Set current stage Process.put(:workflow_current_stage, stage_name) # Start spinner for current stage pid = spawn_link(fn -> animate_spinner(stage_name) end) Process.put(:workflow_spinner_pid, pid) # Update display render_workflow_line() end # Function to complete the workflow complete_workflow = fn -> # Stop spinner if pid = Process.get(:workflow_spinner_pid) do Process.exit(pid, :normal) Process.put(:workflow_spinner_pid, nil) end # Mark all stages as complete if current = Process.get(:workflow_current_stage) do completed = Process.get(:workflow_completed_stages, []) Process.put(:workflow_completed_stages, [current | completed]) end # Render final line with checkmark render_workflow_line(true) end {start_stage, complete_workflow} end # Private function to animate the spinner defp animate_spinner(stage_name) do Enum.reduce(Stream.cycle(@spinner_frames), 0, fn frame, i -> # Update spinner frame every 80ms :timer.sleep(80) # Render the workflow line with current spinner frame render_workflow_line(false, frame) # Check if we should exit if Process.get(:workflow_current_stage) != stage_name do throw(:exit) end i + 1 end) catch :exit -> :ok end # Render the workflow line defp render_workflow_line(completed \\ false, spinner_frame \\ nil) do stages = Process.get(:workflow_stages, []) completed_stages = Process.get(:workflow_completed_stages, []) current_stage = Process.get(:workflow_current_stage) # Clear the line IO.write("\r\e[2K") # Build the workflow line line = build_workflow_line(stages, completed_stages, current_stage, completed, spinner_frame) # Add final checkmark if completed final_line = maybe_add_completion_mark(line, completed) # Write the line IO.write(final_line) # Add newline if completed if completed do IO.write("\n") end end defp build_workflow_line(stages, completed_stages, current_stage, completed, spinner_frame) do Enum.map_join(stages, " → ", fn stage -> format_stage(stage, completed_stages, current_stage, completed, spinner_frame) end) end defp format_stage(stage, completed_stages, current_stage, completed, spinner_frame) do cond do stage in completed_stages -> "#{green()}✓#{reset()} #{stage}" stage == current_stage and not completed -> format_current_stage(stage, spinner_frame) true -> " #{stage}" end end defp format_current_stage(stage, spinner_frame) do if spinner_frame do "#{cyan()}#{spinner_frame}#{reset()} #{stage}" else " #{stage}" end end defp maybe_add_completion_mark(line, completed) do if completed do "#{line} #{green()}✓#{reset()}" else line end end defp write_completion_message(count, total, message) do if count >= total do IO.write("\n#{green()}✓#{reset()} #{bright()}#{message}#{reset()} completed\n") end end end

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bradleygolden/hexdocs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server