Skip to main content
Glama

Continuo Memory System

by GtOkAi
auto_mode.py10.1 kB
""" Autonomous Mode Implements Observe → Plan → Act → Reflect cycle """ import asyncio import json import logging import uuid from datetime import datetime from enum import Enum from typing import Any, Callable, Optional logger = logging.getLogger(__name__) class AutoModePhase(Enum): """Autonomous mode cycle phases""" OBSERVE = "OBSERVE" PLAN = "PLAN" ACT = "ACT" REFLECT = "REFLECT" class AutoModeConfig: """Autonomous mode configuration""" def __init__( self, max_auto_edits: int = 10, max_iterations: int = 5, require_confirmation: bool = True, enabled_modes: Optional[list[str]] = None, rate_limit_seconds: int = 5, ): """ Args: max_auto_edits: Maximum automatic edits per session max_iterations: Maximum cycle iterations require_confirmation: Require confirmation before commits enabled_modes: Enabled modes (auto-debug, auto-doc, auto-plan) rate_limit_seconds: Minimum interval between actions """ self.max_auto_edits = max_auto_edits self.max_iterations = max_iterations self.require_confirmation = require_confirmation self.enabled_modes = enabled_modes or ["auto-debug", "auto-doc"] self.rate_limit_seconds = rate_limit_seconds class AutoModeLogger: """Structured logger for autonomous mode""" def __init__(self, log_file: str = "auto_mode.log"): """ Args: log_file: Path to log file """ self.log_file = log_file self.handler = logging.FileHandler(log_file) self.handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) logger.addHandler(self.handler) def log_event( self, phase: AutoModePhase, context_details: str, source_context: Optional[dict[str, Any]] = None, ) -> str: """ Logs structured event Args: phase: Cycle phase context_details: Event description source_context: Additional context Returns: Unique event ID """ event_id = str(uuid.uuid4()) log_entry = { "event_timestamp": datetime.utcnow().isoformat() + "Z", "log_severity": "INFO", "action_phase": phase.value, "unique_event_id": event_id, "context_details": context_details, "source_context": source_context or {}, } logger.info(json.dumps(log_entry)) return event_id class AutoMode: """ Autonomous mode orchestrator Implements autonomous cycle with monitoring """ def __init__( self, memory_server, config: Optional[AutoModeConfig] = None, log_file: str = "auto_mode.log", ): """ Args: memory_server: MemoryServer instance config: Autonomous mode configuration log_file: Log file """ self.memory = memory_server self.config = config or AutoModeConfig() self.logger = AutoModeLogger(log_file) self.edit_count = 0 self.iteration_count = 0 self.running = False async def observe( self, event_source: Callable[[], Optional[dict[str, Any]]] ) -> Optional[dict[str, Any]]: """ OBSERVE phase: Collect project events Args: event_source: Function that returns events (e.g. build error) Returns: Detected event or None """ event = event_source() if not event: return None # Structured log event_id = self.logger.log_event( AutoModePhase.OBSERVE, f"Event detected: {event.get('type')} - {event.get('description')}", source_context={"file_path": event.get("file", "unknown")}, ) # Store in persistent memory self.memory.store( text=f"EVENT: {event['type']} — {event['description']}", metadata={ "event_id": event_id, "event_type": event["type"], "file": event.get("file"), }, level="N0", ) logger.info(f"[OBSERVE] {event['type']}: {event['description']}") return event async def plan( self, event: dict[str, Any], llm_planner: Callable[[str], dict[str, Any]], ) -> dict[str, Any]: """ PLAN phase: Generate action plan based on context Args: event: Observed event llm_planner: Function that calls planner Returns: Structured plan with tasks """ # Retrieve relevant context context = self.memory.retrieve(query=f"How to solve: {event['description']}?", top_k=5) # Build prompt prompt = self._build_planning_prompt(event, context) # Generate plan plan = llm_planner(prompt) # Structured log self.logger.log_event( AutoModePhase.PLAN, "Plan generated", source_context={"plan_structure": plan, "event_id": event.get("id")}, ) # Store plan in memory self.memory.store( text=f"PLAN: {json.dumps(plan)}", metadata={"event_id": event.get("id"), "plan_tasks": len(plan.get("tasks", []))}, level="N1", ) logger.info(f"[PLAN] Generated with {len(plan.get('tasks', []))} tasks") return plan def _build_planning_prompt(self, event: dict[str, Any], context: dict[str, Any]) -> str: """Build planning prompt""" context_docs = "\n".join(context.get("documents", [])) return f"""# Context Retrieved from Memory {context_docs} # Detected Event Type: {event['type']} Description: {event['description']} File: {event.get('file', 'N/A')} # Task Generate a structured JSON plan to resolve this event. Expected format: {{ "tasks": [ {{"action": "action_name", "params": {{}}, "description": "..."}} ], "reasoning": "Why this plan solves the problem" }} """ async def act( self, plan: dict[str, Any], tool_executor: Callable[[str, dict[str, Any]], Any], ) -> list[dict[str, Any]]: """ ACT phase: Execute plan tasks Args: plan: Plan generated in PLAN phase tool_executor: Function that executes MCP tools Returns: List of action results """ results = [] for task in plan.get("tasks", []): if self.edit_count >= self.config.max_auto_edits: logger.warning("[ACT] Edit limit reached") break # Rate limiting await asyncio.sleep(self.config.rate_limit_seconds) # Execute action action_result = tool_executor(task["action"], task.get("params", {})) # Structured log self.logger.log_event( AutoModePhase.ACT, f"Action executed: {task['action']}", source_context={ "action": task["action"], "execution_outcome": str(action_result)[:500], }, ) results.append( { "action": task["action"], "params": task.get("params"), "result": action_result, } ) self.edit_count += 1 logger.info( f"[ACT] {task['action']} executed ({self.edit_count}/{self.config.max_auto_edits})" ) return results async def reflect( self, event: dict[str, Any], plan: dict[str, Any], results: list[dict[str, Any]] ): """ REFLECT phase: Evaluate results and learn Args: event: Original event plan: Executed plan results: Action results """ # Generate iteration summary summary = { "event": event, "plan": plan, "results_count": len(results), "success": all(r.get("result") for r in results), } # Structured log self.logger.log_event( AutoModePhase.REFLECT, f"Iteration completed: {summary['success']}", source_context=summary, ) # Store learning self.memory.store( text=f"REFLECT: {event['type']} resolved with {len(results)} actions", metadata={"summary": summary}, level="N2", ) logger.info(f"[REFLECT] Success: {summary['success']}") async def run_cycle( self, event_source: Callable[[], Optional[dict[str, Any]]], llm_planner: Callable[[str], dict[str, Any]], tool_executor: Callable[[str, dict[str, Any]], Any], ): """ Execute complete cycle: Observe → Plan → Act → Reflect Args: event_source: Event source llm_planner: LLM planner tool_executor: Tool executor """ self.running = True self.iteration_count = 0 logger.info("Auto Mode started") while self.running and self.iteration_count < self.config.max_iterations: # 1. OBSERVE event = await self.observe(event_source) if not event: await asyncio.sleep(10) continue # 2. PLAN plan = await self.plan(event, llm_planner) # 3. ACT results = await self.act(plan, tool_executor) # 4. REFLECT await self.reflect(event, plan, results) self.iteration_count += 1 logger.info(f"Auto Mode finished after {self.iteration_count} iterations") def stop(self): """Stop the cycle""" self.running = False logger.info("Auto Mode interrupted")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GtOkAi/continuo-memory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server