Skip to main content
Glama
consolidate.py10.8 kB
"""CLI for consolidation pipeline operations. This module provides the `cortexgraph-consolidate` command for running consolidation agents (decay, cluster, merge, promote, relations) either individually or as a full pipeline. Examples: # Run single agent cortexgraph-consolidate run decay --dry-run # Run full pipeline cortexgraph-consolidate run --all # Check queue status cortexgraph-consolidate status --json # Process specific beads issue cortexgraph-consolidate process cortexgraph-abc """ from __future__ import annotations import argparse import json import logging import sys from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from cortexgraph.agents.base import ConsolidationAgent from cortexgraph.storage.jsonl_storage import JSONLStorage logger = logging.getLogger(__name__) # Pipeline execution order AGENT_ORDER = ["decay", "cluster", "merge", "promote", "relations"] def get_storage() -> JSONLStorage: """Get storage instance for agents. Returns: Configured JSONLStorage instance """ from cortexgraph.context import get_db return get_db() def get_agent(name: str, dry_run: bool = False) -> ConsolidationAgent[Any]: """Factory function to create agent instances. Args: name: Agent name (decay, cluster, merge, promote, relations) dry_run: If True, agent will preview without modifying data Returns: Configured agent instance Raises: ValueError: If agent name is unknown """ if name == "decay": from cortexgraph.agents.decay_analyzer import DecayAnalyzer return DecayAnalyzer(dry_run=dry_run) elif name == "cluster": from cortexgraph.agents.cluster_detector import ClusterDetector return ClusterDetector(dry_run=dry_run) elif name == "merge": from cortexgraph.agents.semantic_merge import SemanticMerge return SemanticMerge(dry_run=dry_run) elif name == "promote": from cortexgraph.agents.ltm_promoter import LTMPromoter return LTMPromoter(dry_run=dry_run) elif name == "relations": from cortexgraph.agents.relationship_discovery import RelationshipDiscovery return RelationshipDiscovery(dry_run=dry_run) else: raise ValueError(f"Unknown agent: {name}") def get_queue_status() -> dict[str, int]: """Get current consolidation queue status. Returns: Dictionary with queue counts (pending, in_progress, completed, failed) """ # TODO: Implement actual queue tracking with beads integration return { "pending": 0, "in_progress": 0, "completed": 0, "failed": 0, } def process_issue(issue_id: str, dry_run: bool = False) -> dict[str, Any]: """Process a specific beads issue. Args: issue_id: Beads issue identifier dry_run: If True, preview without executing Returns: Result dictionary with success status and details Raises: ValueError: If issue not found """ # TODO: Implement actual beads integration # For now, raise ValueError for nonexistent issues if issue_id.startswith("nonexistent"): raise ValueError(f"Issue not found: {issue_id}") return { "success": True, "issue_id": issue_id, "action": "processed", "dry_run": dry_run, } def cmd_run(agent: str, dry_run: bool = False, json_output: bool = False) -> int: """Run a single consolidation agent. Args: agent: Agent name (decay, cluster, merge, promote, relations) dry_run: If True, preview without modifying data json_output: If True, format output as JSON Returns: Exit code (0 for success, non-zero for error) """ try: agent_instance = get_agent(agent, dry_run=dry_run) except ValueError as e: if json_output: print(json.dumps({"error": str(e)})) else: print(f"Error: {e}") return 1 try: results = agent_instance.run() if json_output: # Convert results to JSON-serializable format if results: output = [r.to_dict() if hasattr(r, "to_dict") else str(r) for r in results] else: output = [] print(json.dumps({"agent": agent, "results": output})) else: print(f"Agent '{agent}' completed successfully") if results: print(f" Processed {len(results)} item(s)") return 0 except Exception as e: logger.exception(f"Agent '{agent}' failed") if json_output: print(json.dumps({"error": str(e), "agent": agent})) else: print(f"Error running agent '{agent}': {e}") return 1 def cmd_run_all(dry_run: bool = False, json_output: bool = False) -> int: """Run all consolidation agents in pipeline order. Pipeline order: decay → cluster → merge → promote → relations Args: dry_run: If True, preview without modifying data json_output: If True, format output as JSON Returns: Exit code (0 for success, non-zero for error) """ all_results: dict[str, list[Any]] = {} for agent_name in AGENT_ORDER: try: agent = get_agent(agent_name, dry_run=dry_run) results = agent.run() all_results[agent_name] = ( [r.to_dict() if hasattr(r, "to_dict") else str(r) for r in results] if results else [] ) if not json_output: print(f"✓ {agent_name}: processed {len(results) if results else 0} items") except Exception as e: logger.exception(f"Pipeline failed at agent '{agent_name}'") if json_output: print( json.dumps({"error": str(e), "failed_at": agent_name, "results": all_results}) ) else: print(f"✗ Pipeline failed at '{agent_name}': {e}") return 1 if json_output: print(json.dumps({"success": True, "results": all_results})) else: print("\n✅ Full pipeline completed successfully") return 0 def cmd_status(json_output: bool = False) -> tuple[int, str]: """Get current consolidation queue status. Args: json_output: If True, format output as JSON Returns: Tuple of (exit_code, output_string) """ status = get_queue_status() if json_output: output = json.dumps(status) else: output = ( f"Consolidation Queue Status:\n" f" Pending: {status['pending']}\n" f" In Progress: {status['in_progress']}\n" f" Completed: {status['completed']}\n" f" Failed: {status['failed']}" ) print(output) return 0, output def cmd_process(issue_id: str, dry_run: bool = False, json_output: bool = False) -> int: """Process a specific beads issue. Args: issue_id: Beads issue identifier dry_run: If True, preview without executing json_output: If True, format output as JSON Returns: Exit code (0 for success, non-zero for error) """ try: result = process_issue(issue_id, dry_run=dry_run) if json_output: print(json.dumps(result)) else: print(f"Processed issue: {issue_id}") print(f" Action: {result.get('action', 'unknown')}") if dry_run: print(" [DRY RUN - no changes made]") return 0 except ValueError as e: if json_output: print(json.dumps({"error": str(e), "issue_id": issue_id})) else: print(f"Error: {e}") return 1 def main() -> int: """Main CLI entry point. Returns: Exit code (0 for success, non-zero for error) """ parser = argparse.ArgumentParser( prog="cortexgraph-consolidate", description="Run consolidation pipeline operations", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run single agent cortexgraph-consolidate run decay --dry-run # Run full pipeline cortexgraph-consolidate run --all # Check queue status cortexgraph-consolidate status --json # Process specific beads issue cortexgraph-consolidate process cortexgraph-abc --dry-run """, ) # Global options parser.add_argument( "--json", dest="json_output", action="store_true", help="Output results as JSON", ) subparsers = parser.add_subparsers(dest="command", help="Available commands") # Run command run_parser = subparsers.add_parser("run", help="Run consolidation agent(s)") run_parser.add_argument( "agent", nargs="?", choices=AGENT_ORDER, help="Agent to run (decay, cluster, merge, promote, relations)", ) run_parser.add_argument( "--all", dest="run_all", action="store_true", help="Run full consolidation pipeline", ) run_parser.add_argument( "--dry-run", action="store_true", help="Preview without making changes", ) # Status command status_parser = subparsers.add_parser("status", help="Show queue status") status_parser.add_argument( "--json", dest="status_json", action="store_true", help="Output as JSON", ) # Process command process_parser = subparsers.add_parser("process", help="Process specific issue") process_parser.add_argument( "issue_id", help="Beads issue ID to process", ) process_parser.add_argument( "--dry-run", action="store_true", help="Preview without making changes", ) args = parser.parse_args() # Handle commands if args.command == "run": if args.run_all: return cmd_run_all(dry_run=args.dry_run, json_output=args.json_output) elif args.agent: return cmd_run(agent=args.agent, dry_run=args.dry_run, json_output=args.json_output) else: run_parser.print_help() return 1 elif args.command == "status": # Combine both --json flags json_out = args.json_output or getattr(args, "status_json", False) result, _ = cmd_status(json_output=json_out) return result elif args.command == "process": return cmd_process( issue_id=args.issue_id, dry_run=args.dry_run, json_output=args.json_output, ) else: parser.print_help() return 1 if __name__ == "__main__": sys.exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server