Skip to main content
Glama

MCP Interactive Service

ui_cli.py18.4 kB
#!/usr/bin/env python # -*- coding: utf-8 -*- """ Command Line Interface Implementation """ from typing import List, Dict, Any, Optional, Union from fastmcp import Context from rich.console import Console from rich.panel import Panel from rich.markdown import Markdown import subprocess import sys import os import tempfile import json import asyncio import time from lang_manager import get_text END_MARKER = get_text("input_end_marker") if END_MARKER == "NotDefined": END_MARKER = "END" print("CLI END_MARKER", END_MARKER) class CommandLineUI: """Command Line Interface Implementation Class""" def __init__(self): """Initialize command line interface""" self.console = Console() async def select_option( self, options: List[Union[str, Dict[str, Any]]], prompt: str = "Please select one of the following options", ctx: Context = None ) -> Dict[str, Any]: """ Present a set of options to the user for selection by entering numbers or providing custom answers. Will pop up a new command line window to display options and get user input. Args: options: List of options, can be a list of strings or dictionaries prompt: Prompt message displayed to the user ctx: FastMCP context object Returns: Dictionary containing the selection result, in the format: { "selected_index": int, # Index of the user's selection, -1 if custom answer "selected_option": Any, # Content of the user's selected option "custom_input": str, # User's custom input, if any "is_custom": bool # Whether it's a custom answer } """ if ctx: await ctx.info("Displaying options using command line interface...") try: # Create temporary data file with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False, encoding='utf-8') as data_file: data_path = data_file.name # Prepare text dictionary to pass to the temporary script ui_texts = { 'custom_input_tip': get_text('custom_input_tip'), 'input_option': get_text('input_option'), 'invalid_option': get_text('invalid_option'), 'custom_input': get_text('custom_input'), 'multiline_tip': get_text('multiline_tip'), 'end_marker': END_MARKER } # Write options and configuration to temporary file json.dump({ 'options': options, 'prompt': prompt, 'allow_custom': True, # 始终允许自定义输入 'ui_texts': ui_texts }, data_file) # Create temporary result file path result_path = f"{data_path}.result" # Create temporary script file script_content = """ import json import sys import os from rich.console import Console from rich.panel import Panel from rich.markdown import Markdown def main(): console = Console() # Read data file data_path = sys.argv[1] result_path = sys.argv[2] with open(data_path, 'r', encoding='utf-8') as f: data = json.load(f) options = data['options'] prompt = data['prompt'] allow_custom = data['allow_custom'] ui_texts = data.get('ui_texts', {}) end_marker = ui_texts.get('end_marker', 'END') # Format options formatted_options = [] for i, option in enumerate(options, 1): if isinstance(option, dict): # If option is a dictionary, try to get title or description title = option.get("title", option.get("name", option.get("description", f"Option {i}"))) description = option.get("description", "") formatted_option = f"{title}" if description: formatted_option += f"\\n {description}" formatted_options.append(formatted_option) else: # If option is a string, use directly formatted_options.append(str(option)) # Use Rich to display panel md_content = f"# {prompt}\\n\\n" for i, option in enumerate(formatted_options, 1): md_content += f"{i}. {option}\\n\\n" if allow_custom: md_content += f"*{ui_texts.get('custom_input_tip', 'Enter 0 to provide a custom answer')}*" console.print(Panel(Markdown(md_content), border_style="green")) # Get user input while True: try: user_choice = input(f"{ui_texts.get('input_option', 'Enter your choice')}: ").strip() # Handle custom input if user_choice == "0" and allow_custom: console.print(f"{ui_texts.get('custom_input', 'Enter your custom answer')}") console.print(f"{ui_texts.get('multiline_tip', 'You can enter multiple lines. Enter ' + end_marker + ' on a separate line to finish.')}") # Get multiline input input_lines = [] while True: line = input() if line.strip() == end_marker: break input_lines.append(line) custom_input = "\\n".join(input_lines) user_input = { "selected_index": -1, "selected_option": None, "custom_input": custom_input, "is_custom": True } break # Handle numeric choice choice_index = int(user_choice) - 1 if 0 <= choice_index < len(options): selected_option = options[choice_index] user_input = { "selected_index": choice_index, "selected_option": selected_option, "custom_input": "", "is_custom": False } break else: console.print(ui_texts.get('invalid_option', 'Invalid option, please try again'), style="bold red") except ValueError: console.print(ui_texts.get('invalid_option', 'Invalid option, please try again'), style="bold red") # Write result to file with open(result_path, 'w', encoding='utf-8') as f: json.dump(user_input, f) if __name__ == "__main__": try: main() except Exception as e: data_path = sys.argv[1] result_path = sys.argv[2] with open(result_path, "w", encoding="utf-8") as f: json.dump(f"Error: {str(e)}", f) # 自动关闭窗口,不需要用户手动按回车 """ with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as script_file: script_path = script_file.name script_file.write(script_content) if ctx: await ctx.info(f"Starting new command line window...") # Start new command line window to execute script if sys.platform == 'win32': # Windows platform cmd = f'start cmd /c "python {script_path} {data_path} {result_path}"' process = subprocess.Popen(cmd, shell=True) else: # Linux/Mac platform if sys.platform == 'darwin': # macOS cmd = ['osascript', '-e', f'tell app "Terminal" to do script "python {script_path} {data_path} {result_path}"'] else: # Linux cmd = ['x-terminal-emulator', '-e', f'python {script_path} {data_path} {result_path}'] process = subprocess.Popen(cmd) if ctx: await ctx.info(f"Waiting for user input in new window...") # Wait for user input in new window timeout = 300 # 5 minutes timeout start_time = time.time() # Wait for result file to appear while not os.path.exists(result_path): await asyncio.sleep(0.5) if time.time() - start_time > timeout: raise TimeoutError("Timeout waiting for user input") # Wait for file writing to complete await asyncio.sleep(0.5) # Read results with open(result_path, 'r', encoding='utf-8') as f: result = json.load(f) # Clean up temporary files try: os.remove(script_path) os.remove(data_path) os.remove(result_path) except: pass return result except Exception as e: if ctx: await ctx.error(f"Failed to start new window: {str(e)}") # Fall back to original command line implementation self.console.print(f"Failed to start new window, falling back to current window: {str(e)}", style="bold red") # Display options md_content = f"# {prompt}\n\n" for i, option in enumerate(options, 1): if isinstance(option, dict): title = option.get("title", option.get("name", option.get("description", f"Option {i}"))) description = option.get("description", "") formatted_option = f"{title}" if description: formatted_option += f"\n {description}" md_content += f"{i}. {formatted_option}\n\n" else: md_content += f"{i}. {option}\n\n" # 始终允许自定义输入 md_content += f"*{get_text('custom_input_tip')}*" self.console.print(Panel(Markdown(md_content), border_style="green")) # Get user input while True: try: user_choice = input(f"{get_text('input_option')}: ").strip() # Handle custom input - always allow custom input if user_choice == "0": self.console.print(f"{get_text('custom_input')}") self.console.print(f"{get_text('multiline_tip')}") # Get multiline input input_lines = [] while True: line = input() if line.strip() == END_MARKER: break input_lines.append(line) custom_input = "\n".join(input_lines) return { "selected_index": -1, "selected_option": None, "custom_input": custom_input, "is_custom": True } # Handle numeric choice choice_index = int(user_choice) - 1 if 0 <= choice_index < len(options): selected_option = options[choice_index] return { "selected_index": choice_index, "selected_option": selected_option, "custom_input": "", "is_custom": False } else: self.console.print(get_text('invalid_option'), style="bold red") except ValueError: self.console.print(get_text('invalid_option'), style="bold red") async def request_additional_info( self, prompt: str, ctx: Context = None ) -> str: """ Request supplementary information from the user using CLI interface Args: prompt: Prompt message ctx: FastMCP context Returns: User input information """ if ctx: await ctx.info("Requesting information using command line interface...") try: # Create temporary data file with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False, encoding='utf-8') as data_file: data_path = data_file.name # Prepare text dictionary to pass to the temporary script ui_texts = { 'multiline_tip': get_text('multiline_tip'), 'current_info': get_text('current_info'), 'input_prompt': get_text('input_prompt'), 'end_marker': END_MARKER } # Write options and configuration to temporary file json.dump({ 'prompt': prompt, 'ui_texts': ui_texts }, data_file) # Create temporary result file path result_path = f"{data_path}.result" # Create temporary script file script_content = """ import json import sys import os from rich.console import Console from rich.panel import Panel from rich.markdown import Markdown def main(): console = Console() # Read data file data_path = sys.argv[1] result_path = sys.argv[2] with open(data_path, 'r', encoding='utf-8') as f: data = json.load(f) prompt = data['prompt'] ui_texts = data.get('ui_texts', {}) end_marker = ui_texts.get('end_marker', 'END') # Build markdown content md_content = f"### {prompt}\\n\\n" # Add multiline input tip md_content += f"\\n{ui_texts.get('multiline_tip', 'You can enter multiple lines. Enter ' + end_marker + ' on a separate line to finish.')}\\n" # Display content console.print(Markdown(md_content)) # Get user input (multiline) print(f"{ui_texts.get('input_prompt', 'Input')}: ", end="", flush=True) input_lines = [] while True: line = input() if line.strip() == end_marker: break input_lines.append(line) user_input = "\\n".join(input_lines) # Write result to file with open(result_path, 'w', encoding='utf-8') as f: json.dump(user_input, f) if __name__ == "__main__": try: main() except Exception as e: data_path = sys.argv[1] result_path = sys.argv[2] with open(result_path, "w", encoding="utf-8") as f: json.dump(f"Error: {str(e)}", f) # 自动关闭窗口,不需要用户手动按回车 """ with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as script_file: script_path = script_file.name script_file.write(script_content) if ctx: await ctx.info(f"Starting new command line window...") # Start new command line window to execute script if sys.platform == 'win32': # Windows platform cmd = f'start cmd /c "python {script_path} {data_path} {result_path}"' process = subprocess.Popen(cmd, shell=True) else: # Linux/Mac platform if sys.platform == 'darwin': # macOS cmd = ['osascript', '-e', f'tell app "Terminal" to do script "python {script_path} {data_path} {result_path}"'] else: # Linux cmd = ['x-terminal-emulator', '-e', f'python {script_path} {data_path} {result_path}'] process = subprocess.Popen(cmd) if ctx: await ctx.info(f"Waiting for user input in new window...") # Wait for user input in new window timeout = 300 # 5 minutes timeout start_time = time.time() # Wait for result file to appear while not os.path.exists(result_path): await asyncio.sleep(0.5) if time.time() - start_time > timeout: raise TimeoutError("Timeout waiting for user input") # Wait for file writing to complete await asyncio.sleep(0.5) # Read results with open(result_path, 'r', encoding='utf-8') as f: result = json.load(f) # Clean up temporary files try: os.remove(script_path) os.remove(data_path) os.remove(result_path) except: pass return result except Exception as e: if ctx: await ctx.error(f"Failed to start new window: {str(e)}") # Fall back to original command line implementation self.console.print(f"Failed to start new window, falling back to current window: {str(e)}", style="bold red") # Build markdown content md_content = f"### {prompt}\n\n" if current_info: md_content += f"**{get_text('current_info')}**:\n\n{current_info}\n\n" # Add multiline input tip md_content += f"\n{get_text('multiline_tip')}\n" # Display content self.console.print(Markdown(md_content)) # Get user input (multiline) print(f"{get_text('input_prompt')}: ", end="", flush=True) input_lines = [] while True: line = input() if line.strip() == END_MARKER: break input_lines.append(line) return "\n".join(input_lines)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DanielZhao1990/interaction-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server