auto_mode.py•10.1 kB
"""
Autonomous Mode
Implements Observe → Plan → Act → Reflect cycle
"""
import asyncio
import json
import logging
import uuid
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Optional
logger = logging.getLogger(__name__)
class AutoModePhase(Enum):
"""Autonomous mode cycle phases"""
OBSERVE = "OBSERVE"
PLAN = "PLAN"
ACT = "ACT"
REFLECT = "REFLECT"
class AutoModeConfig:
"""Autonomous mode configuration"""
def __init__(
self,
max_auto_edits: int = 10,
max_iterations: int = 5,
require_confirmation: bool = True,
enabled_modes: Optional[list[str]] = None,
rate_limit_seconds: int = 5,
):
"""
Args:
max_auto_edits: Maximum automatic edits per session
max_iterations: Maximum cycle iterations
require_confirmation: Require confirmation before commits
enabled_modes: Enabled modes (auto-debug, auto-doc, auto-plan)
rate_limit_seconds: Minimum interval between actions
"""
self.max_auto_edits = max_auto_edits
self.max_iterations = max_iterations
self.require_confirmation = require_confirmation
self.enabled_modes = enabled_modes or ["auto-debug", "auto-doc"]
self.rate_limit_seconds = rate_limit_seconds
class AutoModeLogger:
"""Structured logger for autonomous mode"""
def __init__(self, log_file: str = "auto_mode.log"):
"""
Args:
log_file: Path to log file
"""
self.log_file = log_file
self.handler = logging.FileHandler(log_file)
self.handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(self.handler)
def log_event(
self,
phase: AutoModePhase,
context_details: str,
source_context: Optional[dict[str, Any]] = None,
) -> str:
"""
Logs structured event
Args:
phase: Cycle phase
context_details: Event description
source_context: Additional context
Returns:
Unique event ID
"""
event_id = str(uuid.uuid4())
log_entry = {
"event_timestamp": datetime.utcnow().isoformat() + "Z",
"log_severity": "INFO",
"action_phase": phase.value,
"unique_event_id": event_id,
"context_details": context_details,
"source_context": source_context or {},
}
logger.info(json.dumps(log_entry))
return event_id
class AutoMode:
"""
Autonomous mode orchestrator
Implements autonomous cycle with monitoring
"""
def __init__(
self,
memory_server,
config: Optional[AutoModeConfig] = None,
log_file: str = "auto_mode.log",
):
"""
Args:
memory_server: MemoryServer instance
config: Autonomous mode configuration
log_file: Log file
"""
self.memory = memory_server
self.config = config or AutoModeConfig()
self.logger = AutoModeLogger(log_file)
self.edit_count = 0
self.iteration_count = 0
self.running = False
async def observe(
self, event_source: Callable[[], Optional[dict[str, Any]]]
) -> Optional[dict[str, Any]]:
"""
OBSERVE phase: Collect project events
Args:
event_source: Function that returns events (e.g. build error)
Returns:
Detected event or None
"""
event = event_source()
if not event:
return None
# Structured log
event_id = self.logger.log_event(
AutoModePhase.OBSERVE,
f"Event detected: {event.get('type')} - {event.get('description')}",
source_context={"file_path": event.get("file", "unknown")},
)
# Store in persistent memory
self.memory.store(
text=f"EVENT: {event['type']} — {event['description']}",
metadata={
"event_id": event_id,
"event_type": event["type"],
"file": event.get("file"),
},
level="N0",
)
logger.info(f"[OBSERVE] {event['type']}: {event['description']}")
return event
async def plan(
self,
event: dict[str, Any],
llm_planner: Callable[[str], dict[str, Any]],
) -> dict[str, Any]:
"""
PLAN phase: Generate action plan based on context
Args:
event: Observed event
llm_planner: Function that calls planner
Returns:
Structured plan with tasks
"""
# Retrieve relevant context
context = self.memory.retrieve(query=f"How to solve: {event['description']}?", top_k=5)
# Build prompt
prompt = self._build_planning_prompt(event, context)
# Generate plan
plan = llm_planner(prompt)
# Structured log
self.logger.log_event(
AutoModePhase.PLAN,
"Plan generated",
source_context={"plan_structure": plan, "event_id": event.get("id")},
)
# Store plan in memory
self.memory.store(
text=f"PLAN: {json.dumps(plan)}",
metadata={"event_id": event.get("id"), "plan_tasks": len(plan.get("tasks", []))},
level="N1",
)
logger.info(f"[PLAN] Generated with {len(plan.get('tasks', []))} tasks")
return plan
def _build_planning_prompt(self, event: dict[str, Any], context: dict[str, Any]) -> str:
"""Build planning prompt"""
context_docs = "\n".join(context.get("documents", []))
return f"""# Context Retrieved from Memory
{context_docs}
# Detected Event
Type: {event['type']}
Description: {event['description']}
File: {event.get('file', 'N/A')}
# Task
Generate a structured JSON plan to resolve this event.
Expected format:
{{
"tasks": [
{{"action": "action_name", "params": {{}}, "description": "..."}}
],
"reasoning": "Why this plan solves the problem"
}}
"""
async def act(
self,
plan: dict[str, Any],
tool_executor: Callable[[str, dict[str, Any]], Any],
) -> list[dict[str, Any]]:
"""
ACT phase: Execute plan tasks
Args:
plan: Plan generated in PLAN phase
tool_executor: Function that executes MCP tools
Returns:
List of action results
"""
results = []
for task in plan.get("tasks", []):
if self.edit_count >= self.config.max_auto_edits:
logger.warning("[ACT] Edit limit reached")
break
# Rate limiting
await asyncio.sleep(self.config.rate_limit_seconds)
# Execute action
action_result = tool_executor(task["action"], task.get("params", {}))
# Structured log
self.logger.log_event(
AutoModePhase.ACT,
f"Action executed: {task['action']}",
source_context={
"action": task["action"],
"execution_outcome": str(action_result)[:500],
},
)
results.append(
{
"action": task["action"],
"params": task.get("params"),
"result": action_result,
}
)
self.edit_count += 1
logger.info(
f"[ACT] {task['action']} executed ({self.edit_count}/{self.config.max_auto_edits})"
)
return results
async def reflect(
self, event: dict[str, Any], plan: dict[str, Any], results: list[dict[str, Any]]
):
"""
REFLECT phase: Evaluate results and learn
Args:
event: Original event
plan: Executed plan
results: Action results
"""
# Generate iteration summary
summary = {
"event": event,
"plan": plan,
"results_count": len(results),
"success": all(r.get("result") for r in results),
}
# Structured log
self.logger.log_event(
AutoModePhase.REFLECT,
f"Iteration completed: {summary['success']}",
source_context=summary,
)
# Store learning
self.memory.store(
text=f"REFLECT: {event['type']} resolved with {len(results)} actions",
metadata={"summary": summary},
level="N2",
)
logger.info(f"[REFLECT] Success: {summary['success']}")
async def run_cycle(
self,
event_source: Callable[[], Optional[dict[str, Any]]],
llm_planner: Callable[[str], dict[str, Any]],
tool_executor: Callable[[str, dict[str, Any]], Any],
):
"""
Execute complete cycle: Observe → Plan → Act → Reflect
Args:
event_source: Event source
llm_planner: LLM planner
tool_executor: Tool executor
"""
self.running = True
self.iteration_count = 0
logger.info("Auto Mode started")
while self.running and self.iteration_count < self.config.max_iterations:
# 1. OBSERVE
event = await self.observe(event_source)
if not event:
await asyncio.sleep(10)
continue
# 2. PLAN
plan = await self.plan(event, llm_planner)
# 3. ACT
results = await self.act(plan, tool_executor)
# 4. REFLECT
await self.reflect(event, plan, results)
self.iteration_count += 1
logger.info(f"Auto Mode finished after {self.iteration_count} iterations")
def stop(self):
"""Stop the cycle"""
self.running = False
logger.info("Auto Mode interrupted")