"""
BridgeService - Converts thoughts into executable operations
=============================================================
The bridge between thinking and doing. Analyzes a completed thinking
session and generates executable operations for the doing service.
This completes the cognitive loop: Think -> Bridge -> Do -> Done
"""
import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from ..config import get_config
from ..logger import get_logger
from ..utils.errors import BridgeError, ValidationError
from ..utils.responses import bridge_response, success_response, error_response
@dataclass
class ActionPlan:
"""A plan of operations generated from thoughts"""
plan_id: str
session_id: str
operations: List[Dict[str, Any]]
created_at: datetime = field(default_factory=datetime.utcnow)
validated: bool = False
validation_errors: List[str] = field(default_factory=list)
executed: bool = False
execution_result: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
return {
'plan_id': self.plan_id,
'session_id': self.session_id,
'operations': self.operations,
'operation_count': len(self.operations),
'created_at': self.created_at.isoformat(),
'validated': self.validated,
'validation_errors': self.validation_errors,
'executed': self.executed,
'ready_to_execute': self.validated and not self.executed
}
class BridgeService:
"""
Converts thinking sessions into executable action plans
Features:
- Analyzes concluded thinking sessions
- Generates operation sequences from thoughts
- Validates operations before execution
- Supports manual operation definition
- Links thinking and doing with clear audit trail
"""
# Keywords that suggest specific operations
OPERATION_KEYWORDS = {
'file': {
'read': ['read file', 'load file', 'get content', 'open file'],
'write': ['write file', 'save file', 'create file', 'output to'],
'exists': ['check if', 'file exists', 'verify file']
},
'shell': {
'run': ['run command', 'execute command', 'shell', 'terminal', 'cmd']
},
'python': {
'execute': ['run python', 'execute python', 'python script', 'python code'],
'eval': ['calculate', 'evaluate', 'compute']
},
'transform': {
'json_parse': ['parse json', 'load json', 'decode json'],
'json_stringify': ['to json', 'json output', 'encode json'],
'extract': ['extract', 'get field', 'access property'],
'template': ['template', 'format string', 'substitute']
}
}
def __init__(self, thinking_service=None, doing_service=None):
"""
Initialize the bridge service
Args:
thinking_service: Optional ThinkingService instance for session access
doing_service: Optional DoingService instance for execution
"""
self.config = get_config()
self.logger = get_logger('BridgeService')
self.thinking = thinking_service
self.doing = doing_service
# Configuration
self.auto_execute = self.config.bridge.auto_execute
self.validate_before_execute = self.config.bridge.validate_before_execute
self.max_plan_operations = self.config.bridge.max_plan_operations
# Active plans
self.plans: Dict[str, ActionPlan] = {}
self.logger.info("BridgeService initialized")
def set_services(self, thinking_service, doing_service):
"""Set service references after initialization"""
self.thinking = thinking_service
self.doing = doing_service
def generate_plan(
self,
session_id: str,
operations: Optional[List[Dict[str, Any]]] = None
) -> Dict[str, Any]:
"""
Generate an action plan from a thinking session
Args:
session_id: The thinking session ID
operations: Optional manual operation list (if not auto-generating)
Returns:
The generated action plan
"""
self.logger.info(f"Generating plan from session {session_id}")
try:
# Get session if thinking service available
session_data = None
if self.thinking:
session_result = self.thinking.get_session(session_id)
if session_result.get('success'):
session_data = session_result.get('data', {})
# Generate or use provided operations
if operations:
plan_operations = operations
elif session_data:
plan_operations = self._auto_generate_operations(session_data)
else:
raise BridgeError(
"Cannot generate plan: no operations provided and no session data available"
)
# Validate operation count
if len(plan_operations) > self.max_plan_operations:
raise BridgeError(
f"Too many operations: {len(plan_operations)} (max: {self.max_plan_operations})"
)
# Create plan
import uuid
plan_id = f"plan_{uuid.uuid4().hex[:12]}"
plan = ActionPlan(
plan_id=plan_id,
session_id=session_id,
operations=plan_operations
)
# Validate if configured
if self.validate_before_execute:
validation_errors = self._validate_operations(plan_operations)
plan.validated = len(validation_errors) == 0
plan.validation_errors = validation_errors
self.plans[plan_id] = plan
self.logger.info(
f"Generated plan {plan_id}: {len(plan_operations)} operations, "
f"validated={plan.validated}"
)
return bridge_response(
thoughts_count=len(session_data.get('thoughts', [])) if session_data else 0,
operations_generated=len(plan_operations),
operations=plan_operations,
ready_to_execute=plan.validated and not plan.executed,
validation_passed=plan.validated,
validation_errors=plan.validation_errors if not plan.validated else None
)
except BridgeError:
raise
except Exception as e:
self.logger.exception(f"Failed to generate plan: {e}")
raise BridgeError(f"Plan generation failed: {str(e)}")
def execute_plan(
self,
plan_id: str,
force: bool = False
) -> Dict[str, Any]:
"""
Execute a generated plan
Args:
plan_id: The plan ID to execute
force: Execute even if validation failed
Returns:
Execution results
"""
self.logger.info(f"Executing plan {plan_id}")
try:
if plan_id not in self.plans:
raise BridgeError(f"Plan not found: {plan_id}")
plan = self.plans[plan_id]
if plan.executed:
raise BridgeError(
f"Plan {plan_id} has already been executed",
suggestion="Generate a new plan to execute again"
)
if not plan.validated and not force:
raise BridgeError(
f"Plan {plan_id} has validation errors",
details={'errors': plan.validation_errors},
suggestion="Fix errors or use force=True to execute anyway"
)
if not self.doing:
raise BridgeError(
"DoingService not available - cannot execute plan"
)
# Execute via doing service
result = self.doing.execute_batch(
operations=plan.operations,
batch_name=f"plan_{plan_id}"
)
plan.executed = True
plan.execution_result = result
self.logger.info(f"Plan {plan_id} executed successfully")
return success_response(
f"Plan {plan_id} executed",
data={
'plan_id': plan_id,
'execution_result': result
}
)
except BridgeError:
raise
except Exception as e:
self.logger.exception(f"Failed to execute plan: {e}")
raise BridgeError(f"Plan execution failed: {str(e)}")
def think_and_do(
self,
problem: str,
thoughts: List[str],
operations: List[Dict[str, Any]],
execute: bool = True
) -> Dict[str, Any]:
"""
Complete cognitive loop in one call: think through a problem, then execute
Args:
problem: The problem statement
thoughts: List of thoughts/reasoning steps
operations: Operations to execute
execute: Whether to execute immediately
Returns:
Complete results from thinking and doing
"""
import time
start_time = time.time()
self.logger.info(f"Think-and-do: {problem[:50]}...")
try:
# Create a pseudo-session for tracking
import uuid
session_id = f"tad_{uuid.uuid4().hex[:8]}"
# Record thoughts (if thinking service available)
if self.thinking:
# Start thinking returns the REAL session_id - use it!
start_result = self.thinking.start_thinking(problem)
session_id = start_result.get('data', {}).get('session_id', session_id)
for i, thought in enumerate(thoughts):
self.thinking.think(
session_id=session_id,
thought=thought,
thought_number=i + 1,
next_thought_needed=(i < len(thoughts) - 1)
)
thinking_summary = {
'session_id': session_id,
'problem': problem,
'thoughts': thoughts,
'thought_count': len(thoughts)
}
# Generate plan
plan_result = self.generate_plan(
session_id=session_id,
operations=operations
)
# Execute if requested
execution_result = None
if execute and plan_result.get('ready_to_execute'):
plan_id = list(self.plans.keys())[-1] # Get the just-created plan
execution_result = self.execute_plan(plan_id)
total_duration = time.time() - start_time
# Build comprehensive result
return success_response(
"Think-and-do complete",
data={
'thinking': thinking_summary,
'plan': plan_result,
'execution': execution_result,
'total_duration_seconds': round(total_duration, 3)
}
)
except Exception as e:
self.logger.exception(f"Think-and-do failed: {e}")
raise BridgeError(f"Think-and-do failed: {str(e)}")
def get_plan(self, plan_id: str) -> Dict[str, Any]:
"""Get a plan by ID"""
if plan_id not in self.plans:
raise BridgeError(f"Plan not found: {plan_id}")
return success_response(
f"Plan {plan_id}",
data=self.plans[plan_id].to_dict()
)
def list_plans(self) -> Dict[str, Any]:
"""List all plans"""
plans = [p.to_dict() for p in self.plans.values()]
return success_response(
f"{len(plans)} plans",
data={'plans': plans}
)
def _auto_generate_operations(
self,
session_data: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Auto-generate operations from session thoughts
This is a simplified implementation - in practice you might
want more sophisticated NLP or LLM-based generation.
"""
operations = []
thoughts = session_data.get('thoughts', [])
for thought in thoughts:
content = thought.get('content', '').lower()
# Check for operation keywords
for service, methods in self.OPERATION_KEYWORDS.items():
for method, keywords in methods.items():
if any(kw in content for kw in keywords):
operations.append({
'service': service,
'method': method,
'params': {},
'description': f"From thought: {thought.get('content', '')[:50]}...",
'source_thought': thought.get('thought_number')
})
break
return operations
def _validate_operations(
self,
operations: List[Dict[str, Any]]
) -> List[str]:
"""Validate operations and return list of errors"""
errors = []
for i, op in enumerate(operations):
if 'service' not in op:
errors.append(f"Operation {i}: missing 'service' field")
if 'method' not in op:
errors.append(f"Operation {i}: missing 'method' field")
# Validate against doing service if available
if self.doing:
if op.get('service') not in self.doing.services:
errors.append(
f"Operation {i}: unknown service '{op.get('service')}'"
)
elif op.get('method') not in self.doing.services.get(op.get('service'), {}):
errors.append(
f"Operation {i}: unknown method '{op.get('method')}' "
f"in service '{op.get('service')}'"
)
return errors