Skip to main content
Glama
cli.py8.26 kB
"""Audit CLI for OpenAccess MCP.""" import sys from pathlib import Path from typing import Optional import typer from rich.console import Console from rich.table import Table from rich.panel import Panel from . import get_audit_logger, create_audit_signer app = typer.Typer(help="OpenAccess MCP Audit Tools") console = Console() @app.command() def verify( log_file: Path = typer.Argument( ..., help="Audit log file to verify" ), key_file: Optional[Path] = typer.Option( None, "--key-file", "-k", help="Public key file for verification" ) ): """Verify the integrity of an audit log.""" try: if not log_file.exists(): console.print(f"[red]Error: Audit log file not found: {log_file}[/red]") sys.exit(1) audit_logger = get_audit_logger(log_file) verification = audit_logger.verify_log_integrity() if "error" in verification: console.print(f"[red]Verification failed: {verification['error']}[/red]") sys.exit(1) # Display verification results table = Table(title="Audit Log Verification Results") table.add_column("Metric", style="cyan") table.add_column("Value", style="green") table.add_row("Total Records", str(verification["total_records"])) table.add_row("Chain Valid", "✓" if verification["chain_valid"] else "✗") table.add_row("First Record", verification["first_record"] or "N/A") table.add_row("Last Record", verification["last_record"] or "N/A") console.print(table) if verification["chain_valid"]: console.print(Panel.fit( "[green]✓ Audit log integrity verified[/green]\n" "All records are properly signed and chained.", title="Verification Successful" )) else: console.print(Panel.fit( "[red]✗ Audit log integrity check failed[/red]\n" "The log may have been tampered with.", title="Verification Failed" )) sys.exit(1) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) @app.command() def stats( log_file: Path = typer.Argument( ..., help="Audit log file to analyze" ) ): """Show statistics about an audit log.""" try: if not log_file.exists(): console.print(f"[red]Error: Audit log file not found: {log_file}[/red]") sys.exit(1) audit_logger = get_audit_logger(log_file) stats = audit_logger.get_log_stats() if "error" in stats: console.print(f"[red]Error reading audit log: {stats['error']}[/red]") sys.exit(1) # Display general statistics general_table = Table(title="General Statistics") general_table.add_column("Metric", style="cyan") general_table.add_column("Value", style="green") general_table.add_row("Total Lines", str(stats["total_lines"])) general_table.add_row("File Size", f"{stats['file_size_bytes']} bytes") general_table.add_row("Last Modified", stats["last_modified"]) console.print(general_table) # Display record counts by tool and result if stats["record_counts"]: tool_table = Table(title="Records by Tool and Result") tool_table.add_column("Tool", style="cyan") tool_table.add_column("Result", style="yellow") tool_table.add_column("Count", style="green") for tool, results in stats["record_counts"].items(): for result, count in results.items(): tool_table.add_row(tool, result, str(count)) console.print(tool_table) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) @app.command() def generate_keys( output_dir: Path = typer.Option( "./keys", "--output-dir", "-o", help="Output directory for generated keys" ) ): """Generate new audit signing keys.""" try: signer = create_audit_signer() private_key, public_key = signer.generate_keypair(output_dir) console.print(Panel.fit( f"[green]✓[/green] Generated new audit keypair:\n\n" f"Private key: [cyan]{private_key}[/cyan]\n" f"Public key: [cyan]{public_key}[/cyan]\n\n" f"[yellow]Warning: Keep the private key secure![/yellow]", title="Key Generation Successful" )) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) @app.command() def extract_records( log_file: Path = typer.Argument( ..., help="Audit log file to extract from" ), output_file: Optional[Path] = typer.Option( None, "--output-file", "-o", help="Output file for extracted records" ), tool: Optional[str] = typer.Option( None, "--tool", "-t", help="Filter by specific tool" ), result: Optional[str] = typer.Option( None, "--result", "-r", help="Filter by specific result" ), actor: Optional[str] = typer.Option( None, "--actor", "-a", help="Filter by specific actor" ) ): """Extract and filter audit records.""" try: if not log_file.exists(): console.print(f"[red]Error: Audit log file not found: {log_file}[/red]") sys.exit(1) audit_logger = get_audit_logger(log_file) # Read and filter records with open(log_file, "r") as f: lines = f.readlines() filtered_records = [] for line in lines: try: import json data = json.loads(line.strip()) # Skip header lines if "type" in data and data["type"] == "audit_log_header": continue # Apply filters if tool and data.get("tool") != tool: continue if result and data.get("result") != result: continue if actor and data.get("actor") != actor: continue filtered_records.append(data) except json.JSONDecodeError: continue # Display results console.print(f"[green]Found {len(filtered_records)} matching records[/green]") if filtered_records: table = Table(title="Filtered Records") table.add_column("Timestamp", style="cyan") table.add_column("Actor", style="green") table.add_column("Tool", style="yellow") table.add_column("Profile", style="blue") table.add_column("Result", style="white") for record in filtered_records[:10]: # Show first 10 table.add_row( record.get("ts", "")[:19], # Truncate timestamp record.get("actor", ""), record.get("tool", ""), record.get("profile_id", ""), record.get("result", "") ) console.print(table) if len(filtered_records) > 10: console.print(f"[dim]... and {len(filtered_records) - 10} more records[/dim]") # Write to output file if specified if output_file: with open(output_file, "w") as f: for record in filtered_records: f.write(json.dumps(record) + "\n") console.print(f"[green]✓[/green] Wrote {len(filtered_records)} records to {output_file}") except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) if __name__ == "__main__": app()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keepithuman/openaccess-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server