"""
DoingService - Sequential execution with variable piping
=========================================================
Executes operation batches sequentially with variable substitution
from previous results. This is the "do" in Think. Do. Done.
Based on the batch_service from opus-warrior-CF-mcp, extended
with more execution backends and better integration.
"""
import re
import time
import subprocess
import sys
import json
from pathlib import Path
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from ..config import get_config
from ..logger import get_logger
from ..utils.errors import (
DoingError,
ValidationError,
ExecutionError,
TimeoutError,
VariableSubstitutionError,
ServiceNotFoundError,
SecurityError
)
from ..utils.responses import doing_response, success_response, error_response
@dataclass
class Operation:
"""Represents a single operation to execute"""
index: int
service: str
method: str
params: Dict[str, Any]
description: Optional[str] = None
timeout: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
'index': self.index,
'service': self.service,
'method': self.method,
'params': self.params,
'description': self.description,
'timeout': self.timeout
}
@dataclass
class OperationResult:
"""Result of a single operation execution"""
index: int
service: str
method: str
success: bool
result: Any = None
error: Optional[str] = None
duration_seconds: float = 0.0
variables_substituted: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
data = {
'index': self.index,
'service': self.service,
'method': self.method,
'success': self.success,
'duration_seconds': round(self.duration_seconds, 3)
}
if self.success:
data['result'] = self.result
else:
data['error'] = self.error
if self.variables_substituted:
data['variables_substituted'] = self.variables_substituted
return data
class DoingService:
"""
Sequential execution service with variable piping
Features:
- Execute operations sequentially
- Variable substitution from previous results ($results[N].path.to.value)
- Multiple execution backends (python, shell, file, http)
- Timeout enforcement per operation
- Full timing and logging
- Stop-on-error or continue modes
"""
# Variable pattern: $results[N].path.to.value
VARIABLE_PATTERN = re.compile(r'\$results\[(\d+)\]((?:\.\w+)*)')
# Dangerous patterns to block
DANGEROUS_PATTERNS = [
r'rm\s+-rf\s+/',
r'del\s+/[sf]',
r'format\s+[a-z]:',
r'>\s*/dev/sd',
r'dd\s+if=.*of=/dev',
r'mkfs\.',
r':(){.*};:', # Fork bomb
]
def __init__(self):
"""Initialize the doing service"""
self.config = get_config()
self.logger = get_logger('DoingService')
# Configuration
self.max_operations = self.config.doing.max_operations
self.max_content_size = self.config.doing.max_content_size_mb * 1024 * 1024
self.default_timeout = self.config.doing.default_timeout
self.max_timeout = self.config.doing.max_timeout
self.stop_on_error = self.config.doing.stop_on_error
self.track_timing = self.config.doing.track_timing
# Available services
self.services: Dict[str, Dict[str, Callable]] = {
'python': {
'execute': self._python_execute,
'eval': self._python_eval,
},
'shell': {
'run': self._shell_run,
},
'file': {
'read': self._file_read,
'write': self._file_write,
'exists': self._file_exists,
},
'transform': {
'json_parse': self._transform_json_parse,
'json_stringify': self._transform_json_stringify,
'extract': self._transform_extract,
'template': self._transform_template,
}
}
self.logger.info(
f"DoingService initialized: max_ops={self.max_operations}, "
f"timeout={self.default_timeout}s, services={list(self.services.keys())}"
)
def execute_batch(
self,
operations: List[Dict[str, Any]],
batch_name: Optional[str] = None,
stop_on_error: Optional[bool] = None,
track_timing: Optional[bool] = None
) -> Dict[str, Any]:
"""
Execute a batch of operations sequentially
Args:
operations: List of operations to execute
batch_name: Optional name for the batch
stop_on_error: Stop on first error (default from config)
track_timing: Track timing per operation (default from config)
Returns:
Batch execution results
Example operations:
[
{
'service': 'file',
'method': 'read',
'params': {'path': '/path/to/file.txt'}
},
{
'service': 'transform',
'method': 'json_parse',
'params': {'content': '$results[0].result.content'}
},
{
'service': 'python',
'method': 'execute',
'params': {'code': 'print(len($results[1].result.data))'}
}
]
"""
batch_name = batch_name or f"batch_{int(time.time())}"
stop_on_error = stop_on_error if stop_on_error is not None else self.stop_on_error
track_timing = track_timing if track_timing is not None else self.track_timing
self.logger.info(f"Executing batch '{batch_name}': {len(operations)} operations")
try:
# Validate operations
validated_ops = self._validate_operations(operations)
# Execute sequentially
results: List[OperationResult] = []
batch_start = time.time()
for op in validated_ops:
op_start = time.time()
try:
# Resolve variables from previous results
resolved_params, vars_used = self._resolve_variables(op.params, results)
# Security check on resolved params
self._security_check(resolved_params)
# Get service and method
service = self.services.get(op.service)
if not service:
raise ServiceNotFoundError(f"Service '{op.service}' not found")
method = service.get(op.method)
if not method:
raise ServiceNotFoundError(
f"Method '{op.method}' not found in service '{op.service}'"
)
# Execute with timeout
timeout = op.timeout or self.default_timeout
result = self._execute_with_timeout(method, resolved_params, timeout)
op_duration = time.time() - op_start
results.append(OperationResult(
index=op.index,
service=op.service,
method=op.method,
success=True,
result=result,
duration_seconds=op_duration,
variables_substituted=vars_used
))
self.logger.info(
f"[{op.index + 1}/{len(validated_ops)}] "
f"{op.service}.{op.method}: SUCCESS ({op_duration:.2f}s)"
)
except Exception as e:
op_duration = time.time() - op_start
results.append(OperationResult(
index=op.index,
service=op.service,
method=op.method,
success=False,
error=str(e),
duration_seconds=op_duration
))
self.logger.warning(
f"[{op.index + 1}/{len(validated_ops)}] "
f"{op.service}.{op.method}: FAILED - {str(e)}"
)
if stop_on_error:
self.logger.info("Stopping batch due to error (stop_on_error=True)")
break
# Build summary
batch_duration = time.time() - batch_start
successful = sum(1 for r in results if r.success)
failed = len(results) - successful
return success_response(
f"Batch '{batch_name}' completed: {successful}/{len(results)} succeeded",
data={
'batch_name': batch_name,
'total_operations': len(validated_ops),
'completed': len(results),
'successful': successful,
'failed': failed,
'stopped_early': len(results) < len(validated_ops),
'duration_seconds': round(batch_duration, 3),
'results': [r.to_dict() for r in results]
}
)
except ValidationError:
raise
except Exception as e:
self.logger.exception(f"Batch execution failed: {e}")
raise DoingError(f"Batch execution failed: {str(e)}")
def execute_single(
self,
service: str,
method: str,
params: Dict[str, Any],
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""
Execute a single operation
Args:
service: Service name
method: Method name
params: Operation parameters
timeout: Operation timeout
Returns:
Operation result
"""
result = self.execute_batch(
operations=[{
'service': service,
'method': method,
'params': params,
'timeout': timeout
}],
batch_name='single_op'
)
# Extract single result
if result['data']['results']:
return result['data']['results'][0]
return error_response("No result from operation")
def list_services(self) -> Dict[str, Any]:
"""List available services and their methods"""
service_info = {}
for service_name, methods in self.services.items():
service_info[service_name] = list(methods.keys())
return success_response(
f"{len(self.services)} services available",
data={
'services': service_info,
'variable_syntax': '$results[N].path.to.value',
'example': {
'service': 'file',
'method': 'read',
'params': {'path': '/example/file.txt'}
}
}
)
# =========================================================================
# Validation
# =========================================================================
def _validate_operations(self, operations: List[Dict[str, Any]]) -> List[Operation]:
"""Validate and convert operations"""
if not operations:
raise ValidationError("Operations list cannot be empty")
if len(operations) > self.max_operations:
raise ValidationError(
f"Too many operations: {len(operations)} (max: {self.max_operations})"
)
validated = []
for i, op in enumerate(operations):
if not isinstance(op, dict):
raise ValidationError(f"Operation {i} must be a dictionary")
if 'service' not in op:
raise ValidationError(f"Operation {i} missing 'service' field")
if 'method' not in op:
raise ValidationError(f"Operation {i} missing 'method' field")
# Verify service exists
if op['service'] not in self.services:
raise ValidationError(
f"Operation {i}: Unknown service '{op['service']}'",
details={'available': list(self.services.keys())}
)
# Verify method exists
if op['method'] not in self.services[op['service']]:
raise ValidationError(
f"Operation {i}: Unknown method '{op['method']}' in service '{op['service']}'",
details={'available': list(self.services[op['service']].keys())}
)
validated.append(Operation(
index=i,
service=op['service'],
method=op['method'],
params=op.get('params', {}),
description=op.get('description'),
timeout=op.get('timeout')
))
return validated
def _security_check(self, params: Dict[str, Any]) -> None:
"""Check parameters for dangerous patterns"""
params_str = json.dumps(params)
for pattern in self.DANGEROUS_PATTERNS:
if re.search(pattern, params_str, re.IGNORECASE):
raise SecurityError(
f"Potentially dangerous operation detected",
details={'pattern': pattern}
)
# =========================================================================
# Variable Resolution
# =========================================================================
def _resolve_variables(
self,
params: Dict[str, Any],
results: List[OperationResult]
) -> tuple[Dict[str, Any], List[str]]:
"""
Resolve variable references in parameters
Returns:
Tuple of (resolved_params, list_of_variables_used)
"""
vars_used = []
resolved = {}
for key, value in params.items():
if isinstance(value, str):
resolved[key], found_vars = self._resolve_string(value, results)
vars_used.extend(found_vars)
elif isinstance(value, dict):
resolved[key], found_vars = self._resolve_variables(value, results)
vars_used.extend(found_vars)
elif isinstance(value, list):
resolved[key] = []
for item in value:
if isinstance(item, str):
resolved_item, found_vars = self._resolve_string(item, results)
resolved[key].append(resolved_item)
vars_used.extend(found_vars)
else:
resolved[key].append(item)
else:
resolved[key] = value
return resolved, vars_used
def _resolve_string(
self,
text: str,
results: List[OperationResult]
) -> tuple[Any, List[str]]:
"""Resolve variable references in a string"""
matches = list(self.VARIABLE_PATTERN.finditer(text))
vars_used = []
if not matches:
return text, vars_used
# If entire string is a single variable, return the extracted value
if len(matches) == 1 and matches[0].group(0) == text:
match = matches[0]
index = int(match.group(1))
path = match.group(2)
vars_used.append(match.group(0))
return self._extract_value(results, index, path), vars_used
# Otherwise, perform string substitution
resolved = text
for match in matches:
index = int(match.group(1))
path = match.group(2)
value = self._extract_value(results, index, path)
vars_used.append(match.group(0))
# Convert to string for substitution
value_str = str(value) if value is not None else ''
resolved = resolved.replace(match.group(0), value_str)
return resolved, vars_used
def _extract_value(
self,
results: List[OperationResult],
index: int,
path: str
) -> Any:
"""Extract a value from results using index and path"""
if index < 0 or index >= len(results):
raise VariableSubstitutionError(
f"Invalid result index: {index}",
details={'available_results': len(results)}
)
result = results[index]
current = result.to_dict()
if path:
path_parts = path.lstrip('.').split('.')
for part in path_parts:
if isinstance(current, dict):
if part not in current:
raise VariableSubstitutionError(
f"Path '{path}' not found in result {index}",
details={'missing_key': part}
)
current = current[part]
else:
raise VariableSubstitutionError(
f"Cannot navigate path '{path}' - value is not a dict"
)
return current
# =========================================================================
# Execution
# =========================================================================
def _execute_with_timeout(
self,
method: Callable,
params: Dict[str, Any],
timeout: int
) -> Any:
"""Execute a method with timeout"""
# For now, simple execution - could add threading for true timeout
return method(**params)
# =========================================================================
# Service Implementations
# =========================================================================
def _python_execute(self, code: str, timeout: int = 30) -> Dict[str, Any]:
"""Execute Python code snippet"""
import tempfile
import os
# Write to temp file
with tempfile.NamedTemporaryFile(
mode='w', suffix='.py', delete=False, encoding='utf-8'
) as f:
f.write(code)
temp_path = f.name
try:
result = subprocess.run(
[sys.executable, temp_path],
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL
)
return {
'stdout': result.stdout,
'stderr': result.stderr,
'exit_code': result.returncode,
'success': result.returncode == 0
}
except subprocess.TimeoutExpired:
raise TimeoutError(f"Python execution exceeded {timeout}s timeout")
finally:
os.unlink(temp_path)
def _python_eval(self, expression: str) -> Any:
"""Evaluate a Python expression (limited scope)"""
# Very limited - only allow safe operations
allowed_builtins = {
'len': len, 'str': str, 'int': int, 'float': float,
'list': list, 'dict': dict, 'bool': bool,
'min': min, 'max': max, 'sum': sum, 'sorted': sorted,
'abs': abs, 'round': round
}
try:
return eval(expression, {"__builtins__": allowed_builtins}, {})
except Exception as e:
raise ExecutionError(f"Eval failed: {str(e)}")
def _shell_run(self, command: str, timeout: int = 30, cwd: str = None) -> Dict[str, Any]:
"""Run a shell command"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
cwd=cwd,
stdin=subprocess.DEVNULL
)
return {
'stdout': result.stdout,
'stderr': result.stderr,
'exit_code': result.returncode,
'success': result.returncode == 0
}
except subprocess.TimeoutExpired:
raise TimeoutError(f"Shell command exceeded {timeout}s timeout")
def _file_read(self, path: str, encoding: str = 'utf-8') -> Dict[str, Any]:
"""Read a file"""
file_path = Path(path)
if not file_path.exists():
raise ExecutionError(f"File not found: {path}")
content = file_path.read_text(encoding=encoding)
return {
'content': content,
'path': str(file_path.absolute()),
'size_bytes': len(content.encode(encoding))
}
def _file_write(
self,
path: str,
content: str,
encoding: str = 'utf-8'
) -> Dict[str, Any]:
"""Write to a file"""
file_path = Path(path)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding=encoding)
return {
'path': str(file_path.absolute()),
'size_bytes': len(content.encode(encoding)),
'success': True
}
def _file_exists(self, path: str) -> Dict[str, Any]:
"""Check if a file exists"""
file_path = Path(path)
return {
'path': str(file_path.absolute()),
'exists': file_path.exists(),
'is_file': file_path.is_file() if file_path.exists() else False,
'is_dir': file_path.is_dir() if file_path.exists() else False
}
def _transform_json_parse(self, content: str) -> Any:
"""Parse JSON string"""
return json.loads(content)
def _transform_json_stringify(self, data: Any, indent: int = 2) -> str:
"""Stringify data to JSON"""
return json.dumps(data, indent=indent)
def _transform_extract(self, data: Any, path: str) -> Any:
"""Extract a value from nested data using dot notation"""
current = data
for part in path.split('.'):
if isinstance(current, dict):
current = current.get(part)
elif isinstance(current, list) and part.isdigit():
current = current[int(part)]
else:
return None
return current
def _transform_template(self, template: str, data: Dict[str, Any]) -> str:
"""Simple template substitution"""
result = template
for key, value in data.items():
result = result.replace(f'{{{key}}}', str(value))
return result