Skip to main content
Glama
beads_integration.py9.8 kB
"""Beads integration for consolidation agent coordination. This module provides helper functions for creating and querying beads issues used to coordinate consolidation agents. Uses subprocess calls to the `bd` CLI with `--json` flag for structured output. Design Decision (from research.md): Beads is designed as a CLI tool, not a Python library. Subprocess approach avoids importing beads internals and is consistent with how other tools (git, gh) are integrated. """ from __future__ import annotations import json import logging import subprocess from typing import Any logger = logging.getLogger(__name__) # Label conventions for consolidation issues AGENT_LABELS = { "decay": "consolidation:decay", "cluster": "consolidation:cluster", "merge": "consolidation:merge", "promote": "consolidation:promote", "relations": "consolidation:relations", } URGENCY_LABELS = { "high": "urgency:high", "medium": "urgency:medium", "low": "urgency:low", } # Priority mapping (beads uses 1-3, 1 = highest) URGENCY_PRIORITY = { "high": 1, "medium": 2, "low": 3, } class BeadsError(Exception): """Raised when beads CLI command fails.""" pass def _run_bd_command(args: list[str], check: bool = True) -> dict[str, Any] | list[Any]: """Run a beads CLI command and return parsed JSON output. Args: args: Command arguments (without 'bd' prefix) check: If True, raise BeadsError on non-zero exit Returns: Parsed JSON output from beads Raises: BeadsError: If command fails and check=True """ cmd = ["bd", *args, "--json"] logger.debug(f"Running beads command: {' '.join(cmd)}") try: result = subprocess.run( cmd, capture_output=True, text=True, check=False, # Handle errors ourselves ) if result.returncode != 0 and check: error_msg = result.stderr.strip() or result.stdout.strip() raise BeadsError(f"Beads command failed: {error_msg}") if not result.stdout.strip(): return {} parsed: dict[str, Any] | list[Any] = json.loads(result.stdout) return parsed except json.JSONDecodeError as e: raise BeadsError(f"Failed to parse beads output: {e}") from e except FileNotFoundError as e: raise BeadsError("beads CLI (bd) not found. Is it installed?") from e def create_consolidation_issue( agent: str, memory_ids: list[str], action: str, urgency: str = "medium", extra_data: dict[str, Any] | None = None, ) -> str: """Create a beads issue for consolidation work. Args: agent: Agent type (decay/cluster/merge/promote/relations) memory_ids: Memory UUIDs involved action: Recommended action urgency: high/medium/low extra_data: Additional JSON data for notes Returns: Beads issue ID Contract: - MUST set title as human-readable description - MUST set notes as JSON with memory_ids and agent - MUST add labels: consolidation:{agent}, urgency:{urgency} - MUST NOT create duplicate issues for same memory_ids Raises: BeadsError: If issue creation fails ValueError: If agent or urgency is invalid """ if agent not in AGENT_LABELS: raise ValueError(f"Invalid agent: {agent}. Must be one of {list(AGENT_LABELS.keys())}") if urgency not in URGENCY_LABELS: raise ValueError( f"Invalid urgency: {urgency}. Must be one of {list(URGENCY_LABELS.keys())}" ) # Check for existing issue with same memory_ids to prevent duplicates existing = query_consolidation_issues(agent=agent, status="open") for issue in existing: notes = issue.get("notes", {}) if isinstance(notes, str): try: notes = json.loads(notes) except json.JSONDecodeError: notes = {} existing_ids = set(notes.get("memory_ids", [])) if existing_ids == set(memory_ids): logger.info(f"Issue already exists for memory_ids: {issue['id']}") return str(issue["id"]) # Build human-readable title if len(memory_ids) == 1: title = f"{agent.title()}: Memory {memory_ids[0][:8]} - {action}" else: title = f"{agent.title()}: {len(memory_ids)} memories - {action}" # Build notes JSON notes_data: dict[str, Any] = { "memory_ids": memory_ids, "agent": agent, "action": action, } if extra_data: notes_data.update(extra_data) # Build labels labels = [AGENT_LABELS[agent], URGENCY_LABELS[urgency]] # Create issue via bd CLI args = [ "create", title, "--type", "task", "--priority", str(URGENCY_PRIORITY[urgency]), "--notes", json.dumps(notes_data), "--labels", ",".join(labels), ] result = _run_bd_command(args) if isinstance(result, dict) and "id" in result: issue_id = str(result["id"]) logger.info(f"Created consolidation issue: {issue_id}") return issue_id raise BeadsError(f"Unexpected response from beads: {result}") def query_consolidation_issues( agent: str | None = None, status: str = "open", urgency: str | None = None, ) -> list[dict[str, Any]]: """Query beads issues for consolidation work. Args: agent: Filter by agent type (None = all consolidation issues) status: Filter by status (open/in_progress/blocked/closed) urgency: Filter by urgency (None = all) Returns: List of issue dicts with id, title, notes, labels, status Contract: - MUST return empty list if no matches (not error) - MUST parse notes JSON for structured data - MUST respect all filters (AND logic) """ args = ["list", "--status", status] # Build label filter labels = [] if agent: if agent not in AGENT_LABELS: raise ValueError(f"Invalid agent: {agent}") labels.append(AGENT_LABELS[agent]) if urgency: if urgency not in URGENCY_LABELS: raise ValueError(f"Invalid urgency: {urgency}") labels.append(URGENCY_LABELS[urgency]) if labels: args.extend(["--labels", ",".join(labels)]) result = _run_bd_command(args, check=False) # Handle empty or error results if not result: return [] # Result might be a list or dict with "result" key issues: list[Any] if isinstance(result, list): issues = result elif isinstance(result, dict): inner = result.get("result", result.get("issues", [])) if isinstance(inner, list): issues = inner elif "id" in result: issues = [result] else: issues = [] else: return [] # Parse notes JSON for each issue for issue in issues: notes = issue.get("notes", "") if isinstance(notes, str) and notes: try: issue["notes"] = json.loads(notes) except json.JSONDecodeError: issue["notes"] = {"raw": notes} # Filter for consolidation issues if no agent specified if not agent: consolidation_labels = set(AGENT_LABELS.values()) issues = [ i for i in issues if any(label in consolidation_labels for label in i.get("labels", [])) ] return issues def claim_issue(issue_id: str) -> bool: """Claim an issue for processing. Args: issue_id: Beads issue ID Returns: True if successfully claimed, False if already claimed Contract: - MUST set status to in_progress - MUST fail gracefully if already in_progress - MUST NOT claim blocked issues """ # First check current status try: result = _run_bd_command(["show", issue_id]) if isinstance(result, dict): current_status = result.get("status", "") if current_status == "in_progress": logger.info(f"Issue {issue_id} already in_progress") return False if current_status == "blocked": logger.warning(f"Cannot claim blocked issue {issue_id}") return False except BeadsError: logger.warning(f"Could not check status for {issue_id}") # Update to in_progress try: _run_bd_command(["update", issue_id, "--status", "in_progress"]) logger.info(f"Claimed issue: {issue_id}") return True except BeadsError as e: logger.error(f"Failed to claim issue {issue_id}: {e}") return False def close_issue(issue_id: str, reason: str) -> None: """Close an issue after processing. Args: issue_id: Beads issue ID reason: Completion reason for audit trail Contract: - MUST set status to closed - MUST add reason to close message """ try: _run_bd_command(["close", issue_id, "--reason", reason]) logger.info(f"Closed issue {issue_id}: {reason}") except BeadsError as e: logger.error(f"Failed to close issue {issue_id}: {e}") raise def block_issue(issue_id: str, error: str) -> None: """Block an issue due to error. Args: issue_id: Beads issue ID error: Error message for debugging Contract: - MUST set status to blocked - MUST add error to issue notes """ try: _run_bd_command(["update", issue_id, "--status", "blocked", "--notes", error]) logger.info(f"Blocked issue {issue_id}: {error}") except BeadsError as e: logger.error(f"Failed to block issue {issue_id}: {e}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server