Skip to main content
Glama

MCP Connection Hub

by claymore666
job.py7.59 kB
""" Job management commands for the MCP Connection Hub CLI. This module provides commands for managing long-running jobs. """ import typer from typing import Optional from rich.console import Console from rich.table import Table from rich.progress import Progress import time # Create Typer app for job commands app = typer.Typer(name="job", help="Manage long-running jobs") # Create Console for rich output console = Console() @app.command("list") def list_jobs( status: Optional[str] = typer.Option(None, "--status", "-s", help="Filter by status (running, completed, failed)"), limit: int = typer.Option(10, "--limit", "-n", help="Maximum number of jobs to show"), all: bool = typer.Option(False, "--all", "-a", help="Show all jobs (overrides limit)") ): """ List all jobs. """ # Placeholder implementation (will be implemented properly in future milestones) table = Table(title="Jobs") table.add_column("ID", style="cyan") table.add_column("Tool", style="blue") table.add_column("Status", style="green") table.add_column("Progress", style="yellow") table.add_column("Created", style="magenta") # Add placeholder data statuses = ["running", "completed", "failed"] tools = ["brave_web_search", "vector_db", "code_exec"] # Filter by status if specified if status is not None and status in statuses: filtered_statuses = [status] else: filtered_statuses = statuses # Sample job data for i in range(1, min(limit + 1, 20) if not all else 20): if i % 3 == 0: job_status = "running" progress = "75%" elif i % 3 == 1: job_status = "completed" progress = "100%" else: job_status = "failed" progress = "45%" if job_status in filtered_statuses: table.add_row( f"job_{i:03d}", tools[i % len(tools)], job_status, progress, f"2023-06-{i:02d} 12:34:56" ) console.print(table) @app.command("status") def job_status( job_id: str = typer.Argument(..., help="Job ID to check") ): """ Check the status of a specific job. """ # Placeholder implementation (will be implemented properly in future milestones) if job_id.startswith("job_"): # Example status for a running job console.print(f"[bold cyan]Job: {job_id}[/bold cyan]") console.print(f"Status: [green]running[/green]") console.print(f"Tool: brave_web_search") console.print(f"Function: web_search") console.print(f"Created: 2023-06-01 12:34:56") console.print(f"Progress: 75%") console.print(f"Estimated completion: 2 minutes remaining") console.print() # Sample job details console.print("[bold]Parameters:[/bold]") console.print(" query: 'MCP Connection Hub architecture'") console.print(" count: 10") # Progress visualization with Progress() as progress: task = progress.add_task("[cyan]Job progress", total=100, completed=75) else: console.print(f"[red]Job '{job_id}' not found[/red]") @app.command("cancel") def cancel_job( job_id: str = typer.Argument(..., help="Job ID to cancel"), force: bool = typer.Option(False, "--force", "-f", help="Force cancellation without confirmation") ): """ Cancel a running job. """ # Placeholder implementation (will be implemented properly in future milestones) if not force: confirm = typer.confirm(f"Are you sure you want to cancel job {job_id}?") if not confirm: console.print("[yellow]Cancellation aborted[/yellow]") return if job_id.startswith("job_"): console.print(f"[green]Job {job_id} has been cancelled[/green]") else: console.print(f"[red]Job '{job_id}' not found[/red]") @app.command("purge") def purge_jobs( status: str = typer.Option("completed", "--status", "-s", help="Status to purge (completed, failed, all)"), older_than: str = typer.Option("7d", "--older-than", "-o", help="Purge jobs older than this (e.g., '7d', '24h')"), force: bool = typer.Option(False, "--force", "-f", help="Force purge without confirmation") ): """ Purge old jobs from the system. """ # Placeholder implementation (will be implemented properly in future milestones) if not force: confirm = typer.confirm(f"Are you sure you want to purge {status} jobs older than {older_than}?") if not confirm: console.print("[yellow]Purge aborted[/yellow]") return with Progress() as progress: task = progress.add_task("[cyan]Purging jobs...", total=100) # Simulate purge process for i in range(0, 101, 10): time.sleep(0.1) # Simulate work progress.update(task, completed=i) console.print(f"[green]Successfully purged {status} jobs older than {older_than}[/green]") @app.command("results") def job_results( job_id: str = typer.Argument(..., help="Job ID to get results for"), format: str = typer.Option("table", "--format", "-f", help="Output format (table, json)"), output: Optional[str] = typer.Option(None, "--output", "-o", help="Output file path") ): """ Get the results of a completed job. """ # Placeholder implementation (will be implemented properly in future milestones) if not job_id.startswith("job_"): console.print(f"[red]Job '{job_id}' not found[/red]") return # Example results results = { "job_id": job_id, "status": "completed", "tool": "brave_web_search", "function": "web_search", "created": "2023-06-01 12:34:56", "completed": "2023-06-01 12:35:26", "duration": "30s", "results": [ { "title": "MCP Connection Hub Documentation", "url": "https://example.com/mcp-hub", "description": "Official documentation for the MCP Connection Hub." }, { "title": "Building Tools for AI Systems", "url": "https://example.com/ai-tools", "description": "Learn how to build tools for AI systems like MCP Connection Hub." } ] } if format == "json": import json output_json = json.dumps(results, indent=2) if output: try: with open(output, "w") as f: f.write(output_json) console.print(f"[green]Results saved to {output}[/green]") except Exception as e: console.print(f"[red]Error saving results: {str(e)}[/red]") console.print(output_json) else: console.print(output_json) else: # Table format table = Table(title=f"Results for {job_id}") table.add_column("Title", style="cyan") table.add_column("URL", style="blue") table.add_column("Description", style="green") for result in results["results"]: table.add_row( result["title"], result["url"], result["description"] ) console.print(table) if output: console.print(f"[yellow]Warning: Table format cannot be saved to file. Use --format json for file output.[/yellow]")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/claymore666/MCPConnectionHub'

If you have feedback or need assistance with the MCP directory API, please join our Discord server