executor.py•8.47 kB
"""execution engine with type coercion, timeouts, retries, structured errors."""
from __future__ import annotations
import asyncio
import inspect
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from dataclasses import dataclass, asdict
import time
from .reflection import ToolMetadata
from .pagination import PaginationDetector, PaginationAdapter, PaginationConfig
from .safety import SafetyValidator, SafetyConfig
from .auth_plugins import AuthManager
@dataclass
class ExecutionResult:
"""structured result of tool execution."""
success: bool
result: Any = None
error: Optional[Dict[str, Any]] = None
duration_ms: float = 0.0
dry_run: bool = False
@dataclass
class ExecutionError:
"""structured error payload."""
code: str
message: str
hint: Optional[str] = None
origin: Optional[str] = None
details: Optional[Dict[str, Any]] = None
class TypeCoercer:
"""coerces json args to python types."""
def coerce(self, value: Any, target_type: type) -> Any:
"""coerce value to target type."""
if target_type == inspect.Parameter.empty:
return value
# handle none
if value is None:
return None
# direct match
if isinstance(value, target_type):
return value
# coercion rules
try:
if target_type == int:
return int(value)
elif target_type == float:
return float(value)
elif target_type == bool:
if isinstance(value, str):
return value.lower() in ("true", "1", "yes")
return bool(value)
elif target_type == str:
return str(value)
elif target_type == Path:
return Path(value)
elif target_type == datetime:
if isinstance(value, str):
return datetime.fromisoformat(value)
return value
else:
return value
except (ValueError, TypeError) as e:
raise ValueError(f"cannot coerce {value!r} to {target_type.__name__}: {e}")
class Executor:
"""executes sdk tool calls with safety, retries, pagination."""
def __init__(
self,
safety_config: Optional[SafetyConfig] = None,
max_pagination_items: int = 100
):
self.safety = SafetyValidator(safety_config or SafetyConfig())
self.pagination_detector = PaginationDetector()
self.pagination_adapter = PaginationAdapter(max_items=max_pagination_items)
self.coercer = TypeCoercer()
self.auth_manager = AuthManager()
async def execute(
self,
tool: ToolMetadata,
args: Optional[Dict[str, Any]] = None
) -> ExecutionResult:
"""execute a tool with all safety/pagination/retry logic."""
start = time.time()
args = args or {}
try:
# validate safety
error_msg = self.safety.validate_call(
tool.fq_name,
tool.is_mutating,
(),
args
)
if error_msg:
return ExecutionResult(
success=False,
error=asdict(ExecutionError(
code="SAFETY_BLOCKED",
message=error_msg,
hint="enable allow_mutating in safety config if intentional"
)),
duration_ms=(time.time() - start) * 1000
)
# check dry-run
if not self.safety.should_execute(tool.is_mutating):
return ExecutionResult(
success=True,
result=self.safety.create_dry_run_response(tool.fq_name, (), args),
duration_ms=(time.time() - start) * 1000,
dry_run=True
)
# coerce arguments
coerced_kwargs = self._coerce_arguments(tool, args)
# inject authentication
if tool.class_name:
# need to instantiate class first
coerced_kwargs = self.auth_manager.inject_auth(
tool.module,
type(tool.callable_obj),
coerced_kwargs
)
# detect pagination
pagination_config = self.pagination_detector.detect(
tool.callable_obj,
tool.signature
)
# execute with retry logic
result = await self._execute_with_retry(
tool,
coerced_kwargs,
pagination_config
)
# redact secrets from result
result = self.safety.redact_secrets(result)
return ExecutionResult(
success=True,
result=result,
duration_ms=(time.time() - start) * 1000
)
except Exception as exc:
return ExecutionResult(
success=False,
error=asdict(self._create_error(exc)),
duration_ms=(time.time() - start) * 1000
)
def _coerce_arguments(
self,
tool: ToolMetadata,
args: Dict[str, Any]
) -> Dict[str, Any]:
"""coerce json arguments to proper python types."""
if not tool.signature:
return args
coerced = {}
for param_name, value in args.items():
if param_name in tool.signature.parameters:
param = tool.signature.parameters[param_name]
if param.annotation != inspect.Parameter.empty:
coerced[param_name] = self.coercer.coerce(value, param.annotation)
else:
coerced[param_name] = value
else:
coerced[param_name] = value
return coerced
async def _execute_with_retry(
self,
tool: ToolMetadata,
kwargs: Dict[str, Any],
pagination_config: PaginationConfig
) -> Any:
"""execute with retry logic on transient errors."""
last_error = None
for attempt in range(self.safety.config.max_retries):
try:
# handle async vs sync
if tool.is_async:
result = await asyncio.wait_for(
tool.callable_obj(**kwargs),
timeout=self.safety.config.timeout_seconds
)
else:
result = tool.callable_obj(**kwargs)
# apply pagination adapter
result = self.pagination_adapter.execute(
lambda: result, # already executed
(),
{},
pagination_config
)
return result
except asyncio.TimeoutError:
raise TimeoutError(f"execution exceeded {self.safety.config.timeout_seconds}s timeout")
except Exception as exc:
last_error = exc
if not self._is_transient_error(exc):
raise
# exponential backoff
if attempt < self.safety.config.max_retries - 1:
await asyncio.sleep(2 ** attempt)
raise last_error or RuntimeError("max retries exceeded")
def _is_transient_error(self, exc: Exception) -> bool:
"""check if error is transient and worth retrying."""
error_str = str(exc).lower()
transient_patterns = [
"timeout", "connection", "network", "unavailable",
"too many requests", "rate limit", "503", "429"
]
return any(pattern in error_str for pattern in transient_patterns)
def _create_error(self, exc: Exception) -> ExecutionError:
"""create structured error from exception."""
return ExecutionError(
code=type(exc).__name__,
message=str(exc),
origin=exc.__class__.__module__,
details={"type": type(exc).__name__}
)