OpenAI MCP Server
by arthurcolle
#!/usr/bin/env python3
# claude_code/lib/ui/tool_visualizer.py
"""Real-time tool execution visualization."""
import logging
import time
import json
from typing import Dict, List, Any, Optional
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn
from rich.table import Table
from rich.box import ROUNDED
from rich.text import Text
from rich.live import Live
from rich.layout import Layout
from rich.syntax import Syntax
from ..tools.base import ToolResult
logger = logging.getLogger(__name__)
class ToolCallVisualizer:
"""Visualizes tool calls in real-time."""
def __init__(self, console: Console):
"""Initialize the tool call visualizer.
Args:
console: Rich console instance
"""
self.console = console
self.active_calls: Dict[str, Dict[str, Any]] = {}
self.completed_calls: List[Dict[str, Any]] = []
self.layout = self._create_layout()
self.live = Live(self.layout, console=console, refresh_per_second=4, auto_refresh=False)
self.max_completed_calls = 5
# Keep track of recent tool results for routines
self.recent_tool_results: List[ToolResult] = []
self.max_recent_results = 20 # Maximum number of recent results to track
def _create_layout(self) -> Layout:
"""Create the layout for the tool call visualization.
Returns:
Layout object
"""
layout = Layout()
layout.split(
Layout(name="active", size=3),
Layout(name="completed", size=3)
)
return layout
def _create_active_calls_panel(self) -> Panel:
"""Create a panel with active tool calls.
Returns:
Panel with active call information
"""
if not self.active_calls:
return Panel(
"No active tool calls",
title="[bold blue]Active Tool Calls[/bold blue]",
border_style="blue",
box=ROUNDED
)
# Create progress bars for each active call
progress = Progress(
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
expand=True,
console=self.console
)
# Add tasks for each active call
for call_id, call_info in self.active_calls.items():
if "task_id" not in call_info:
# Create a new task for this call
description = f"{call_info['tool_name']} ({call_id[:6]}...)"
task_id = progress.add_task(description, total=100, completed=int(call_info["progress"] * 100))
call_info["task_id"] = task_id
else:
# Update existing task
progress.update(call_info["task_id"], completed=int(call_info["progress"] * 100))
# Create a table with parameter information
table = Table(show_header=True, header_style="bold cyan", box=ROUNDED, expand=True)
table.add_column("Tool")
table.add_column("Parameters")
for call_id, call_info in self.active_calls.items():
# Format parameters nicely
params = call_info.get("parameters", {})
if params:
formatted_params = "\n".join([f"{k}: {self._format_value(v)}" for k, v in params.items()])
else:
formatted_params = "None"
table.add_row(call_info["tool_name"], formatted_params)
return Panel(
progress,
title="[bold blue]Active Tool Calls[/bold blue]",
border_style="blue",
box=ROUNDED
)
def _create_completed_calls_panel(self) -> Panel:
"""Create a panel with completed tool calls.
Returns:
Panel with completed call information
"""
if not self.completed_calls:
return Panel(
"No completed tool calls",
title="[bold green]Recent Tool Results[/bold green]",
border_style="green",
box=ROUNDED
)
# Create a table for results
table = Table(show_header=True, header_style="bold green", box=ROUNDED, expand=True)
table.add_column("Tool")
table.add_column("Status")
table.add_column("Time")
table.add_column("Result Preview")
# Show only the most recent completed calls
for call_info in self.completed_calls[-self.max_completed_calls:]:
tool_name = call_info["tool_name"]
status = call_info["status"]
execution_time = f"{call_info['execution_time']:.2f}s"
# Format result preview
result = call_info.get("result", "")
if result:
# Truncate and format result
preview = self._format_result_preview(result, tool_name)
else:
preview = "No result"
# Status with color
status_text = Text(status)
if status == "success":
status_text.stylize("bold green")
else:
status_text.stylize("bold red")
table.add_row(tool_name, status_text, execution_time, preview)
return Panel(
table,
title="[bold green]Recent Tool Results[/bold green]",
border_style="green",
box=ROUNDED
)
def _format_value(self, value: Any) -> str:
"""Format a parameter value for display.
Args:
value: Parameter value
Returns:
Formatted string
"""
if isinstance(value, (dict, list)):
# Convert complex structures to JSON with indentation
return json.dumps(value, indent=2)
return str(value)
def _format_result_preview(self, result: str, tool_name: str) -> str:
"""Format a result preview.
Args:
result: Result string
tool_name: Name of the tool
Returns:
Formatted preview string
"""
# Truncate result for preview
if len(result) > 200:
preview = result[:200] + "..."
else:
preview = result
# Clean up newlines for display
preview = preview.replace("\n", "\\n")
return preview
def start(self) -> None:
"""Start the visualization."""
self.live.start()
self.refresh()
def stop(self) -> None:
"""Stop the visualization."""
self.live.stop()
def refresh(self) -> None:
"""Refresh the visualization."""
# Update the layout with current information
self.layout["active"].update(self._create_active_calls_panel())
self.layout["completed"].update(self._create_completed_calls_panel())
# Refresh the live display
self.live.refresh()
def add_tool_call(self, tool_call_id: str, tool_name: str, parameters: Dict[str, Any]) -> None:
"""Add a new tool call to visualize.
Args:
tool_call_id: ID of the tool call
tool_name: Name of the tool
parameters: Tool parameters
"""
self.active_calls[tool_call_id] = {
"tool_name": tool_name,
"parameters": parameters,
"start_time": time.time(),
"progress": 0.0
}
self.refresh()
def update_progress(self, tool_call_id: str, progress: float) -> None:
"""Update the progress of a tool call.
Args:
tool_call_id: ID of the tool call
progress: Progress value (0-1)
"""
if tool_call_id in self.active_calls:
self.active_calls[tool_call_id]["progress"] = progress
self.refresh()
def complete_tool_call(self, tool_call_id: str, result: ToolResult) -> None:
"""Mark a tool call as complete.
Args:
tool_call_id: ID of the tool call
result: Tool execution result
"""
if tool_call_id in self.active_calls:
call_info = self.active_calls[tool_call_id].copy()
# Add result information
call_info["result"] = result.result
call_info["status"] = result.status
call_info["execution_time"] = result.execution_time
call_info["end_time"] = time.time()
# Add to completed calls
self.completed_calls.append(call_info)
# Trim completed calls if needed
if len(self.completed_calls) > self.max_completed_calls * 2:
self.completed_calls = self.completed_calls[-self.max_completed_calls:]
# Remove from active calls
del self.active_calls[tool_call_id]
# Store in recent tool results for routines
if result.status == "success":
self.recent_tool_results.append(result)
# Keep only the most recent results
if len(self.recent_tool_results) > self.max_recent_results:
self.recent_tool_results.pop(0)
self.refresh()
def show_result_detail(self, result: ToolResult) -> None:
"""Display detailed result information.
Args:
result: Tool execution result
"""
# Detect if result might be code
content = result.result
if content.startswith(("def ", "class ", "import ", "from ")) or "```" in content:
# Try to extract code blocks
if "```" in content:
blocks = content.split("```")
# Find a code block with a language specifier
for i in range(1, len(blocks), 2):
if i < len(blocks):
lang = blocks[i].split("\n")[0].strip()
code = "\n".join(blocks[i].split("\n")[1:])
if lang and code:
# Attempt to display as syntax-highlighted code
try:
syntax = Syntax(code, lang, theme="monokai", line_numbers=True)
self.console.print(Panel(syntax, title=f"[bold]Result: {result.name}[/bold]"))
return
except Exception:
pass
# If we can't extract a code block, try to detect language
for lang in ["python", "javascript", "bash", "json"]:
try:
syntax = Syntax(content, lang, theme="monokai", line_numbers=True)
self.console.print(Panel(syntax, title=f"[bold]Result: {result.name}[/bold]"))
return
except Exception:
pass
# Just print as regular text if not code or if highlighting failed
self.console.print(Panel(content, title=f"[bold]Result: {result.name}[/bold]"))
class MCTSVisualizer:
"""Visualizes the Monte Carlo Tree Search process in real-time with enhanced intelligence."""
def __init__(self, console: Console):
"""Initialize the MCTS visualizer.
Args:
console: Rich console instance
"""
self.console = console
self.root_node = None
self.current_iteration = 0
self.max_iterations = 0
self.best_action = None
self.active_simulation = None
self.simulation_path = []
self.layout = self._create_layout()
self.live = Live(self.layout, console=console, refresh_per_second=10, auto_refresh=False)
# Intelligence enhancement - track history
self.action_history = {} # Track action performance over time
self.visit_distribution = {} # Track how visits are distributed
self.exploration_patterns = [] # Track exploration patterns
self.quality_metrics = {"search_efficiency": 0.0, "exploration_balance": 0.0}
self.auto_improvement_enabled = True
def _create_layout(self) -> Layout:
"""Create the layout for MCTS visualization.
Returns:
Layout object
"""
layout = Layout()
# Create the main sections with more detailed visualization
layout.split(
Layout(name="header", size=3),
Layout(name="main"),
Layout(name="intelligence", size=7), # New section for intelligence metrics
Layout(name="stats", size=5)
)
# Split the main section into tree, simulation and action insights
layout["main"].split_row(
Layout(name="tree", ratio=2),
Layout(name="simulation", ratio=1),
Layout(name="insights", ratio=1) # New section for action insights
)
return layout
def set_search_parameters(self, root_node: Any, max_iterations: int, additional_params: Dict[str, Any] = None) -> None:
"""Set the search parameters with enhanced intelligence options.
Args:
root_node: The root node of the search tree
max_iterations: Maximum number of iterations
additional_params: Additional parameters for intelligent search
"""
self.root_node = root_node
self.max_iterations = max_iterations
self.current_iteration = 0
# Initialize intelligence tracking
self.action_history = {}
self.visit_distribution = {}
self.exploration_patterns = []
# Set additional intelligence parameters
if additional_params:
self.auto_improvement_enabled = additional_params.get('auto_improvement', True)
# Apply any initial intelligence strategies
if additional_params.get('initial_action_bias'):
self.action_history = additional_params['initial_action_bias']
self.refresh()
def update_iteration(self, iteration: int, selected_node: Any = None,
expanded_node: Any = None, simulation_path: List[Any] = None,
simulation_result: float = None, best_action: Any = None,
node_values: Dict[str, float] = None) -> None:
"""Update the current iteration status with enhanced tracking.
Args:
iteration: Current iteration number
selected_node: Node selected in this iteration
expanded_node: Node expanded in this iteration
simulation_path: Path of the simulation
simulation_result: Result of the simulation
best_action: Current best action
node_values: Values of important nodes in the search (for visualization)
"""
self.current_iteration = iteration
self.selected_node = selected_node
self.expanded_node = expanded_node
self.simulation_path = simulation_path or []
self.simulation_result = simulation_result
if best_action is not None:
self.best_action = best_action
# Intelligence tracking - update action history
if self.simulation_path and simulation_result is not None:
for _, action in self.simulation_path:
if action is not None:
action_str = str(action)
if action_str not in self.action_history:
self.action_history[action_str] = {
"visits": 0,
"total_value": 0.0,
"iterations": []
}
self.action_history[action_str]["visits"] += 1
self.action_history[action_str]["total_value"] += simulation_result
self.action_history[action_str]["iterations"].append(iteration)
# Update exploration pattern
if selected_node:
# Record exploration choice
self.exploration_patterns.append({
"iteration": iteration,
"node_depth": self._get_node_depth(selected_node),
"node_breadth": len(getattr(selected_node, "children", {})),
"value_estimate": getattr(selected_node, "value", 0) / max(1, getattr(selected_node, "visits", 1))
})
# Update visit distribution
if self.root_node and hasattr(self.root_node, "children"):
self._update_visit_distribution()
# Update quality metrics
self._update_quality_metrics()
self.refresh()
def start(self) -> None:
"""Start the visualization."""
self.live.start()
self.refresh()
def stop(self) -> None:
"""Stop the visualization."""
self.live.stop()
def refresh(self) -> None:
"""Refresh the visualization."""
# Update header
header_content = f"[bold blue]Enhanced Monte Carlo Tree Search - Iteration {self.current_iteration}/{self.max_iterations}[/bold blue]"
if self.best_action:
header_content += f" | Best Action: {self.best_action}"
intelligence_status = "[green]Enabled[/green]" if self.auto_improvement_enabled else "[yellow]Disabled[/yellow]"
header_content += f" | Intelligent Search: {intelligence_status}"
self.layout["header"].update(Panel(header_content, border_style="blue"))
# Update tree visualization
self.layout["tree"].update(self._create_tree_panel())
# Update simulation visualization
self.layout["simulation"].update(self._create_simulation_panel())
# Update action insights panel
self.layout["insights"].update(self._create_insights_panel())
# Update intelligence metrics
self.layout["intelligence"].update(self._create_intelligence_panel())
# Update stats
self.layout["stats"].update(self._create_stats_panel())
# Refresh the live display
self.live.refresh()
def _create_tree_panel(self) -> Panel:
"""Create a panel showing the current state of the search tree.
Returns:
Panel with tree visualization
"""
if not self.root_node:
return Panel("No search tree initialized", title="[bold]Search Tree[/bold]")
# Create a table to show the tree structure
from rich.tree import Tree
from rich import box
tree = Tree("🔍 Root Node", guide_style="bold blue")
# Limit the depth and breadth for display
max_depth = 3
max_children = 5
def add_node(node, tree_node, depth=0, path=None):
if depth >= max_depth or not node or not hasattr(node, "children"):
return
if path is None:
path = []
# Add children nodes
children = list(node.children.items())
if not children:
return
# Sort children by a combination of visits and value
def node_score(node_pair):
child_node = node_pair[1]
visits = getattr(child_node, "visits", 0)
value = getattr(child_node, "value", 0)
# Combine visits and value for scoring
if visits > 0:
# Use UCB-style formula for ranking
exploitation = value / visits
exploration = (2 * 0.5 * (math.log(node.visits) / visits)) if node.visits > 0 and visits > 0 else 0
return exploitation + exploration
return 0
# Sort by this smarter formula
children.sort(key=node_score, reverse=True)
children = children[:max_children]
for action, child in children:
# Format node information
visits = getattr(child, "visits", 0)
value = getattr(child, "value", 0)
# Highlight the node with more sophisticated coloring
style = ""
if child == self.selected_node:
style = "bold yellow"
elif child == self.expanded_node:
style = "bold green"
else:
# Color based on value
if visits > 0:
avg_value = value / visits
if avg_value > 0.7:
style = "green"
elif avg_value > 0.4:
style = "blue"
elif avg_value > 0.2:
style = "yellow"
else:
style = "red"
# Create the node label with enhanced information
current_path = path + [action]
if visits > 0:
avg_value = value / visits
confidence = min(1.0, math.sqrt(visits) / 5) * 100 # Simple confidence estimate
label = f"[{style}]{action}: (Visits: {visits}, Value: {avg_value:.3f}, Conf: {confidence:.0f}%)[/{style}]"
else:
label = f"[{style}]{action}: (New)[/{style}]"
# Add the child node to the tree
child_tree = tree_node.add(label)
# Recursively add its children
add_node(child, child_tree, depth + 1, current_path)
# Start building the tree from the root
if hasattr(self.root_node, "children"):
# Add math import for node scoring
import math
add_node(self.root_node, tree)
return Panel(tree, title="[bold]Search Tree[/bold]", border_style="blue")
def _create_simulation_panel(self) -> Panel:
"""Create a panel showing the current simulation with enhanced analytics.
Returns:
Panel with simulation visualization
"""
if not self.simulation_path:
return Panel("No active simulation", title="[bold]Current Simulation[/bold]")
# Create a list of simulation steps
from rich.table import Table
table = Table(box=None, expand=True)
table.add_column("Step")
table.add_column("Action")
table.add_column("Expected Value") # New column
for i, (state, action) in enumerate(self.simulation_path):
# Get expected value for this action
action_str = str(action) if action is not None else "None"
expected_value = "N/A"
if action_str in self.action_history:
history = self.action_history[action_str]
if history["visits"] > 0:
expected_value = f"{history['total_value'] / history['visits']:.3f}"
table.add_row(f"Step {i+1}", f"{action}", expected_value)
if self.simulation_result is not None:
# Add path quality metric
path_quality = "Low"
if self.simulation_result > 0.7:
path_quality = "[bold green]High[/bold green]"
elif self.simulation_result > 0.4:
path_quality = "[yellow]Medium[/yellow]"
else:
path_quality = "[red]Low[/red]"
table.add_row("Result",
f"[bold green]{self.simulation_result:.3f}[/bold green]",
f"Path Quality: {path_quality}")
return Panel(table, title="[bold]Current Simulation[/bold]", border_style="green")
def _create_insights_panel(self) -> Panel:
"""Create a panel showing action insights from learned patterns.
Returns:
Panel with action insights
"""
from rich.table import Table
if not self.action_history:
return Panel("No action insights available yet", title="[bold]Action Insights[/bold]")
# Get top performing actions
top_actions = []
for action, data in self.action_history.items():
if data["visits"] >= 3: # Only consider actions with enough samples
avg_value = data["total_value"] / data["visits"]
top_actions.append((action, avg_value, data["visits"]))
# Sort by value and take top 5
top_actions.sort(key=lambda x: x[1], reverse=True)
top_actions = top_actions[:5]
# Create insights table
table = Table(box=None, expand=True)
table.add_column("Action")
table.add_column("Avg Value")
table.add_column("Visits")
table.add_column("Trend")
for action, avg_value, visits in top_actions:
# Generate trend indicator based on recent performance
trend = "→"
history = self.action_history[action]["iterations"]
if len(history) >= 5:
recent = set(history[-3:]) # Last 3 iterations
if self.current_iteration - max(recent) <= 5:
trend = "↑" # Recently used
elif self.current_iteration - max(recent) >= 10:
trend = "↓" # Not used recently
# Color code based on value
if avg_value > 0.7:
value_str = f"[green]{avg_value:.3f}[/green]"
elif avg_value > 0.4:
value_str = f"[blue]{avg_value:.3f}[/blue]"
else:
value_str = f"[yellow]{avg_value:.3f}[/yellow]"
table.add_row(str(action), value_str, str(visits), trend)
return Panel(table, title="[bold]Action Insights[/bold]", border_style="cyan")
def _create_intelligence_panel(self) -> Panel:
"""Create a panel showing intelligence metrics and learning patterns.
Returns:
Panel with intelligence visualization
"""
from rich.table import Table
from rich.columns import Columns
# Create metrics table
metrics_table = Table(box=None, expand=True)
metrics_table.add_column("Metric")
metrics_table.add_column("Value")
# Add search quality metrics
for metric, value in self.quality_metrics.items():
formatted_name = metric.replace("_", " ").title()
# Color based on value
if value > 0.7:
value_str = f"[green]{value:.2f}[/green]"
elif value > 0.4:
value_str = f"[blue]{value:.2f}[/blue]"
else:
value_str = f"[yellow]{value:.2f}[/yellow]"
metrics_table.add_row(formatted_name, value_str)
# Create exploration table
exploration_table = Table(box=None, expand=True)
exploration_table.add_column("Pattern")
exploration_table.add_column("Value")
# Add exploration patterns
if self.exploration_patterns:
# Average depth of exploration
avg_depth = sum(p["node_depth"] for p in self.exploration_patterns) / len(self.exploration_patterns)
exploration_table.add_row("Avg Exploration Depth", f"{avg_depth:.2f}")
# Depth trend (increasing or decreasing)
if len(self.exploration_patterns) >= 5:
recent_avg = sum(p["node_depth"] for p in self.exploration_patterns[-5:]) / 5
earlier_avg = sum(p["node_depth"] for p in self.exploration_patterns[:-5]) / max(1, len(self.exploration_patterns) - 5)
if recent_avg > earlier_avg * 1.2:
trend = "[green]Deepening[/green]"
elif recent_avg < earlier_avg * 0.8:
trend = "[yellow]Shallowing[/yellow]"
else:
trend = "[blue]Stable[/blue]"
exploration_table.add_row("Depth Trend", trend)
# Exploration-exploitation balance
if len(self.exploration_patterns) >= 3:
# Higher values = more exploitation of known good paths
exploitation_ratio = sum(1 for p in self.exploration_patterns[-10:]
if p["value_estimate"] > 0.5) / min(10, len(self.exploration_patterns))
if exploitation_ratio > 0.7:
balance = "[yellow]Heavy Exploitation[/yellow]"
elif exploitation_ratio < 0.3:
balance = "[yellow]Heavy Exploration[/yellow]"
else:
balance = "[green]Balanced[/green]"
exploration_table.add_row("Search Balance", balance)
# Combine tables into columns
columns = Columns([metrics_table, exploration_table])
return Panel(columns, title="[bold]Intelligence Metrics[/bold]", border_style="magenta")
def _create_stats_panel(self) -> Panel:
"""Create a panel showing search statistics with enhanced metrics.
Returns:
Panel with statistics
"""
if not self.root_node:
return Panel("No statistics available", title="[bold]Search Statistics[/bold]")
# Collect statistics
total_nodes = 0
max_depth = 0
total_visits = getattr(self.root_node, "visits", 0)
avg_branching = 0
def count_nodes(node, depth=0):
nonlocal total_nodes, max_depth, avg_branching
if not node or not hasattr(node, "children"):
return
total_nodes += 1
max_depth = max(max_depth, depth)
# Count children for branching factor
num_children = len(node.children)
if num_children > 0:
avg_branching += num_children
for child in node.children.values():
count_nodes(child, depth + 1)
count_nodes(self.root_node)
# Calculate average branching factor
if total_nodes > 1: # Root node doesn't count for avg branching
avg_branching /= (total_nodes - 1)
# Create a table of statistics
from rich.table import Table
table = Table(box=None, expand=True)
table.add_column("Metric")
table.add_column("Value")
table.add_row("Total Nodes", str(total_nodes))
table.add_row("Max Depth", str(max_depth))
table.add_row("Total Visits", str(total_visits))
table.add_row("Avg Branching", f"{avg_branching:.2f}")
table.add_row("Progress", f"{self.current_iteration / self.max_iterations:.1%}")
# Efficiency estimate (higher is better)
if total_visits > 0:
visit_efficiency = total_nodes / total_visits
efficiency_str = f"{visit_efficiency:.2f}"
table.add_row("Search Efficiency", efficiency_str)
return Panel(table, title="[bold]Search Statistics[/bold]", border_style="magenta")
def _get_node_depth(self, node):
"""Calculate the depth of a node in the tree."""
depth = 0
current = node
while getattr(current, "parent", None) is not None:
depth += 1
current = current.parent
return depth
def _update_visit_distribution(self):
"""Update the distribution of visits across the tree."""
levels = {}
def count_visits_by_level(node, depth=0):
if not node or not hasattr(node, "children"):
return
# Initialize level if not present
if depth not in levels:
levels[depth] = {"visits": 0, "nodes": 0}
# Update level stats
levels[depth]["visits"] += getattr(node, "visits", 0)
levels[depth]["nodes"] += 1
# Process children
for child in node.children.values():
count_visits_by_level(child, depth + 1)
# Start counting from root
count_visits_by_level(self.root_node)
# Update visit distribution
self.visit_distribution = levels
def _update_quality_metrics(self):
"""Update quality metrics for the search process."""
# Search efficiency - ratio of valuable nodes to total nodes
# Higher values indicate more efficient search
if self.visit_distribution:
useful_visits = sum(level["visits"] for depth, level in self.visit_distribution.items()
if depth > 0) # Exclude root
total_visits = sum(level["visits"] for level in self.visit_distribution.values())
if total_visits > 0:
self.quality_metrics["search_efficiency"] = useful_visits / total_visits
# Exploration balance - how well the algorithm balances exploration vs exploitation
if self.exploration_patterns:
# Calculate variance in exploration depth
depths = [p["node_depth"] for p in self.exploration_patterns[-20:]] # Last 20 iterations
if depths:
import statistics
try:
depth_variance = statistics.variance(depths) if len(depths) > 1 else 0
# Normalize to 0-1 range (higher variance = more balanced exploration)
normalized_variance = min(1.0, depth_variance / 5.0) # Assume variance > 5 is high
self.quality_metrics["exploration_balance"] = normalized_variance
except statistics.StatisticsError:
pass
class ParallelExecutionVisualizer:
"""Visualizes parallel execution of tool calls in real-time."""
def __init__(self, console: Console):
"""Initialize the parallel execution visualizer.
Args:
console: Rich console instance
"""
self.console = console
self.active_executions = {}
self.completed_executions = []
self.layout = self._create_layout()
self.live = Live(self.layout, console=console, refresh_per_second=10, auto_refresh=False)
def _create_layout(self) -> Layout:
"""Create the layout for parallel execution visualization.
Returns:
Layout object
"""
layout = Layout()
# Create the main sections
layout.split(
Layout(name="header", size=3),
Layout(name="executions"),
Layout(name="metrics", size=5)
)
return layout
def add_execution(self, execution_id: str, tool_name: str, parameters: Dict[str, Any]) -> None:
"""Add a new execution to visualize.
Args:
execution_id: Unique ID for the execution
tool_name: Name of the tool being executed
parameters: Parameters for the execution
"""
self.active_executions[execution_id] = {
"tool_name": tool_name,
"parameters": parameters,
"start_time": time.time(),
"progress": 0.0,
"status": "running"
}
self.refresh()
def update_progress(self, execution_id: str, progress: float) -> None:
"""Update the progress of an execution.
Args:
execution_id: ID of the execution
progress: Progress value (0-1)
"""
if execution_id in self.active_executions:
self.active_executions[execution_id]["progress"] = progress
self.refresh()
def complete_execution(self, execution_id: str, result: Any, status: str = "success") -> None:
"""Mark an execution as complete.
Args:
execution_id: ID of the execution
result: Result of the execution
status: Status of completion
"""
if execution_id in self.active_executions:
execution = self.active_executions[execution_id].copy()
execution["end_time"] = time.time()
execution["duration"] = execution["end_time"] - execution["start_time"]
execution["result"] = result
execution["status"] = status
# Move to completed executions
self.completed_executions.append(execution)
del self.active_executions[execution_id]
# Limit completed executions list
if len(self.completed_executions) > 20:
self.completed_executions = self.completed_executions[-20:]
self.refresh()
def start(self) -> None:
"""Start the visualization."""
self.live.start()
self.refresh()
def stop(self) -> None:
"""Stop the visualization."""
self.live.stop()
def refresh(self) -> None:
"""Refresh the visualization."""
# Update header
header_content = f"[bold blue]Parallel Execution Monitor[/bold blue] | Active: {len(self.active_executions)} | Completed: {len(self.completed_executions)}"
self.layout["header"].update(Panel(header_content, border_style="blue"))
# Update executions visualization
self.layout["executions"].update(self._create_executions_panel())
# Update metrics
self.layout["metrics"].update(self._create_metrics_panel())
# Refresh the live display
self.live.refresh()
def _create_executions_panel(self) -> Panel:
"""Create a panel showing active and recent executions.
Returns:
Panel with executions visualization
"""
from rich.table import Table
from rich.progress import BarColumn, Progress, TextColumn
# Create progress bars for active executions
progress_group = Table.grid(expand=True)
if self.active_executions:
# Create a progress group
progress = Progress(
TextColumn("[bold blue]{task.description}"),
BarColumn(bar_width=None),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("| Elapsed: {task.elapsed:.2f}s"),
expand=True
)
# Add tasks for each active execution
for exec_id, execution in self.active_executions.items():
tool_name = execution["tool_name"]
description = f"{tool_name} ({exec_id[:8]}...)"
task_id = progress.add_task(description, total=100, completed=int(execution["progress"] * 100))
progress_group.add_row(progress)
else:
progress_group.add_row("[italic]No active executions[/italic]")
# Create a table for completed executions
completed_table = Table(show_header=True, header_style="bold blue", expand=True)
completed_table.add_column("Tool")
completed_table.add_column("Duration")
completed_table.add_column("Status")
completed_table.add_column("Result Preview")
if self.completed_executions:
# Most recent first
for execution in reversed(self.completed_executions[-10:]):
tool_name = execution["tool_name"]
duration = f"{execution['duration']:.2f}s"
status = execution["status"]
# Format result preview
result = str(execution.get("result", ""))
preview = result[:50] + "..." if len(result) > 50 else result
# Add status with color
status_text = f"[green]{status}[/green]" if status == "success" else f"[red]{status}[/red]"
completed_table.add_row(tool_name, duration, status_text, preview)
else:
completed_table.add_row("[italic]No completed executions[/italic]", "", "", "")
# Combine both into a layout
layout = Layout()
layout.split(
Layout(name="active", size=len(self.active_executions) * 2 + 3 if self.active_executions else 3),
Layout(name="completed")
)
layout["active"].update(Panel(progress_group, title="[bold]Active Executions[/bold]", border_style="blue"))
layout["completed"].update(Panel(completed_table, title="[bold]Recent Completions[/bold]", border_style="green"))
return layout
def _create_metrics_panel(self) -> Panel:
"""Create a panel showing execution metrics.
Returns:
Panel with metrics visualization
"""
from rich.table import Table
# Calculate metrics
total_executions = len(self.completed_executions)
successful = sum(1 for e in self.completed_executions if e["status"] == "success")
failed = total_executions - successful
if total_executions > 0:
success_rate = successful / total_executions
avg_duration = sum(e["duration"] for e in self.completed_executions) / total_executions
else:
success_rate = 0
avg_duration = 0
# Create metrics table
table = Table(box=None, expand=True)
table.add_column("Metric")
table.add_column("Value")
table.add_row("Total Executions", str(total_executions))
table.add_row("Success Rate", f"{success_rate:.1%}")
table.add_row("Average Duration", f"{avg_duration:.2f}s")
table.add_row("Current Parallelism", str(len(self.active_executions)))
return Panel(table, title="[bold]Execution Metrics[/bold]", border_style="magenta")
class MultiPanelLayout:
"""Creates a multi-panel layout for the entire UI."""
def __init__(self, console: Console):
"""Initialize the multi-panel layout.
Args:
console: Rich console instance
"""
self.console = console
self.layout = self._create_layout()
self.live = Live(self.layout, console=console, refresh_per_second=4, auto_refresh=False)
def _create_layout(self) -> Layout:
"""Create the main application layout.
Returns:
Layout object
"""
layout = Layout()
# Split into three main sections
layout.split(
Layout(name="conversation", ratio=3),
Layout(name="tools", ratio=2),
Layout(name="input", ratio=1)
)
# Further split the tools section
layout["tools"].split_row(
Layout(name="active_tools"),
Layout(name="cost", size=30)
)
return layout
def start(self) -> None:
"""Start the live display."""
self.live.start()
def stop(self) -> None:
"""Stop the live display."""
self.live.stop()
def refresh(self) -> None:
"""Refresh the display."""
self.live.refresh()
def update_section(self, section: str, content: Any) -> None:
"""Update a section of the layout.
Args:
section: Section name
content: Content to display
"""
if section in self.layout:
self.layout[section].update(content)
self.refresh()