Skip to main content
Glama
research.md7.84 kB
# Research: Multi-Agent Memory Consolidation **Phase**: 0 (Research) | **Date**: 2025-11-24 ## Research Tasks ### 1. Beads Integration Patterns **Question**: How should Python code interact with beads for issue creation and querying? **Decision**: Use subprocess calls to `bd` CLI with `--json` flag for structured output **Rationale**: - Beads is designed as a CLI tool, not a Python library - `--json` flag provides structured output for parsing - Subprocess approach avoids importing beads internals - Consistent with how other tools (git, gh) are integrated **Alternatives Considered**: - Direct SQLite access: Rejected - bypasses beads business logic, fragile - Python bindings: Don't exist, would need to create/maintain - REST API: Beads doesn't expose one **Implementation Pattern**: ```python import subprocess import json def create_beads_issue(title: str, notes: dict, labels: list[str]) -> str: """Create a beads issue and return its ID.""" result = subprocess.run( ["bd", "create", "--title", title, "--notes", json.dumps(notes), "--labels", ",".join(labels), "--json"], capture_output=True, text=True, check=True ) return json.loads(result.stdout)["id"] def query_beads_issues(labels: list[str]) -> list[dict]: """Query beads issues by label.""" result = subprocess.run( ["bd", "list", "--labels", ",".join(labels), "--json"], capture_output=True, text=True, check=True ) return json.loads(result.stdout)["result"] ``` --- ### 2. Agent Base Class Design **Question**: What interface should ConsolidationAgent provide? **Decision**: Abstract base class with `run()`, `dry_run()`, `process_item()` methods **Rationale**: - Consistent interface across all five agents - Dry-run mode is a spec requirement (FR-010) - `process_item()` enables rate limiting at item level - Abstract base enforces implementation **Alternatives Considered**: - Protocol/interface only: Less enforcement, easier to miss methods - Composition over inheritance: More complex for simple use case - No base class: Code duplication, inconsistent behavior **Interface Design**: ```python from abc import ABC, abstractmethod from typing import TypeVar, Generic from pydantic import BaseModel T = TypeVar("T", bound=BaseModel) class ConsolidationAgent(ABC, Generic[T]): """Base class for all consolidation agents.""" def __init__(self, dry_run: bool = False, rate_limit: int = 100): self.dry_run = dry_run self.rate_limit = rate_limit self._processed_count = 0 @abstractmethod def scan(self) -> list[str]: """Scan for items needing processing. Returns memory IDs.""" ... @abstractmethod def process_item(self, memory_id: str) -> T: """Process a single item. Returns result model.""" ... def run(self) -> list[T]: """Run agent on all scanned items.""" items = self.scan() results = [] for item_id in items: if self._processed_count >= self.rate_limit: break result = self.process_item(item_id) self._log_to_beads(item_id, result) results.append(result) self._processed_count += 1 return results ``` --- ### 3. Existing Core Module Integration **Question**: How should agents call existing `core/` functions? **Decision**: Direct Python imports from `cortexgraph.core` **Rationale**: - Existing modules are well-tested (100% coverage on consolidation.py) - Direct imports are fastest, no overhead - Type hints preserved across call boundaries - Matches existing pattern (tools import from core) **Integration Points**: | Agent | Core Module | Functions Used | |-------|-------------|----------------| | Decay Analyzer | `core.decay` | `calculate_score()`, `is_in_danger_zone()` | | Decay Analyzer | `core.review` | `get_review_priority()` | | Cluster Detector | `core.clustering` | `find_similar_memories()`, `calculate_cohesion()` | | Semantic Merge | `core.consolidation` | `merge_memories()`, `create_consolidated_content()` | | LTM Promoter | `core.scoring` | `should_promote()` | | LTM Promoter | `vault.writer` | `write_markdown()` | | Relationship Discovery | `core.clustering` | `calculate_similarity()` | --- ### 4. Hybrid Triggering Implementation **Question**: How should scheduled vs event-driven triggers work? **Decision**: Scheduler class with cron-like intervals + hook into `save_memory` for urgent checks **Rationale**: - Scheduled: Simple cron-like execution (hourly default) - Event-driven: Check decay score after `save_memory`, trigger if < 0.10 - Separation of concerns: Scheduler handles timing, agents handle logic **Alternatives Considered**: - Pure scheduled: Misses urgent decay situations - Pure event-driven: Complex hooks into every operation - Background daemon: Overkill for local CLI tool **Implementation Approach**: ```python # Scheduled (via cron or launchd) # crontab: 0 * * * * cortexgraph-consolidate run --all # Event trigger (hook after save_memory) def post_save_check(memory_id: str) -> None: """Check if newly saved memory needs urgent attention.""" score = calculate_score(memory_id) if score < 0.10: create_urgent_beads_issue(memory_id, score) ``` --- ### 5. Confidence-Based Processing **Question**: How should confidence thresholds affect processing flow? **Decision**: Three-tier system with configurable thresholds **Rationale**: - Matches clarification decision (≥0.9 auto, 0.7-0.9 log, <0.7 wait) - Aligns with existing `consolidate_memories` pattern (cohesion > 0.75) - Configurable via environment variables **Implementation**: ```python from enum import Enum class ProcessingDecision(Enum): AUTO = "auto" # confidence >= 0.9 LOG_ONLY = "log" # 0.7 <= confidence < 0.9 WAIT_HUMAN = "wait" # confidence < 0.7 def decide_processing(confidence: float) -> ProcessingDecision: if confidence >= 0.9: return ProcessingDecision.AUTO elif confidence >= 0.7: return ProcessingDecision.LOG_ONLY else: return ProcessingDecision.WAIT_HUMAN ``` --- ### 6. Rate Limiting Strategy **Question**: How should 100 ops/minute rate limit be enforced? **Decision**: Token bucket algorithm with per-agent limits **Rationale**: - Simple, well-understood algorithm - Per-agent allows parallelism while preventing storms - Configurable via `CORTEXGRAPH_AGENT_RATE_LIMIT` **Implementation**: ```python import time from collections import deque class RateLimiter: def __init__(self, max_ops: int = 100, window_seconds: int = 60): self.max_ops = max_ops self.window = window_seconds self.timestamps: deque[float] = deque() def acquire(self) -> bool: """Try to acquire a token. Returns True if allowed.""" now = time.time() # Remove old timestamps while self.timestamps and self.timestamps[0] < now - self.window: self.timestamps.popleft() if len(self.timestamps) < self.max_ops: self.timestamps.append(now) return True return False def wait_and_acquire(self) -> None: """Block until a token is available.""" while not self.acquire(): time.sleep(0.1) ``` --- ## Summary of Decisions | Topic | Decision | Risk Level | |-------|----------|------------| | Beads integration | Subprocess `bd` CLI calls | Low | | Agent base class | ABC with Generic result types | Low | | Core integration | Direct Python imports | Low | | Triggering | Hybrid (scheduled + event hooks) | Medium | | Confidence handling | Three-tier thresholds | Low | | Rate limiting | Token bucket per agent | Low | **All NEEDS CLARIFICATION items resolved.** Ready for Phase 1 design.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server