models.py•8.37 kB
"""
TaskMaster data models for stateful task management.
"""
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any, Optional, List
import uuid
class TaskStatus(Enum):
"""Task lifecycle status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
PAUSED = "paused"
class TaskType(Enum):
"""Task execution types."""
ONESHOT = "oneshot" # Single execution
RECURRING = "recurring" # Scheduled recurring
WORKFLOW = "workflow" # Multi-step workflow
INTERACTIVE = "interactive" # Requires user input
class TaskPriority(Enum):
"""Task execution priority."""
LOW = 1
NORMAL = 5
HIGH = 8
CRITICAL = 10
@dataclass
class TaskResult:
"""Result of task execution."""
task_id: str
status: TaskStatus
result: Optional[Any] = None
error: Optional[str] = None
execution_time: float = 0.0
output: Optional[str] = None
artifacts: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
@dataclass
class Task:
"""Background task definition."""
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
task_type: TaskType = TaskType.ONESHOT
priority: TaskPriority = TaskPriority.NORMAL
# Execution details
command: str = ""
parameters: Dict[str, Any] = field(default_factory=dict)
capability: Optional[str] = None
# Scheduling
scheduled_at: Optional[datetime] = None
timeout: Optional[float] = None
max_retries: int = 3
retry_count: int = 0
# Dependencies
depends_on: List[str] = field(default_factory=list)
blocks: List[str] = field(default_factory=list)
# State tracking
status: TaskStatus = TaskStatus.PENDING
progress: float = 0.0
created_at: datetime = field(default_factory=datetime.now)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# Results and logs
result: Optional[TaskResult] = None
logs: List[str] = field(default_factory=list)
error_message: Optional[str] = None
# Configuration
auto_retry: bool = True
notify_on_completion: bool = True
persistent: bool = True
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation."""
return {
'task_id': self.task_id,
'name': self.name,
'description': self.description,
'task_type': self.task_type.value,
'priority': self.priority.value,
'command': self.command,
'parameters': self.parameters,
'capability': self.capability,
'scheduled_at': self.scheduled_at.isoformat() if self.scheduled_at else None,
'timeout': self.timeout,
'max_retries': self.max_retries,
'retry_count': self.retry_count,
'depends_on': self.depends_on,
'blocks': self.blocks,
'status': self.status.value,
'progress': self.progress,
'created_at': self.created_at.isoformat(),
'started_at': self.started_at.isoformat() if self.started_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'result': self.result.to_dict() if self.result else None,
'logs': self.logs,
'error_message': self.error_message,
'auto_retry': self.auto_retry,
'notify_on_completion': self.notify_on_completion,
'persistent': self.persistent
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Task':
"""Create from dictionary representation."""
task = cls()
task.task_id = data.get('task_id', task.task_id)
task.name = data.get('name', '')
task.description = data.get('description', '')
task.task_type = TaskType(data.get('task_type', TaskType.ONESHOT.value))
task.priority = TaskPriority(data.get('priority', TaskPriority.NORMAL.value))
task.command = data.get('command', '')
task.parameters = data.get('parameters', {})
task.capability = data.get('capability')
# Dates
if data.get('scheduled_at'):
task.scheduled_at = datetime.fromisoformat(data['scheduled_at'])
if data.get('started_at'):
task.started_at = datetime.fromisoformat(data['started_at'])
if data.get('completed_at'):
task.completed_at = datetime.fromisoformat(data['completed_at'])
if data.get('created_at'):
task.created_at = datetime.fromisoformat(data['created_at'])
# Other fields
task.timeout = data.get('timeout')
task.max_retries = data.get('max_retries', 3)
task.retry_count = data.get('retry_count', 0)
task.depends_on = data.get('depends_on', [])
task.blocks = data.get('blocks', [])
task.status = TaskStatus(data.get('status', TaskStatus.PENDING.value))
task.progress = data.get('progress', 0.0)
task.logs = data.get('logs', [])
task.error_message = data.get('error_message')
task.auto_retry = data.get('auto_retry', True)
task.notify_on_completion = data.get('notify_on_completion', True)
task.persistent = data.get('persistent', True)
return task
@dataclass
class TaskDependency:
"""Task dependency relationship."""
task_id: str
depends_on: str
dependency_type: str = "completion" # completion, success, failure
def is_satisfied(self, tasks: Dict[str, Task]) -> bool:
"""Check if dependency is satisfied."""
if self.depends_on not in tasks:
return False
dep_task = tasks[self.depends_on]
if self.dependency_type == "completion":
return dep_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]
elif self.dependency_type == "success":
return dep_task.status == TaskStatus.COMPLETED
elif self.dependency_type == "failure":
return dep_task.status == TaskStatus.FAILED
return False
@dataclass
class TaskMetrics:
"""Task execution metrics."""
total_tasks: int = 0
pending_tasks: int = 0
running_tasks: int = 0
completed_tasks: int = 0
failed_tasks: int = 0
cancelled_tasks: int = 0
average_execution_time: float = 0.0
success_rate: float = 0.0
def update_from_tasks(self, tasks: List[Task]):
"""Update metrics from task list."""
self.total_tasks = len(tasks)
self.pending_tasks = sum(1 for t in tasks if t.status == TaskStatus.PENDING)
self.running_tasks = sum(1 for t in tasks if t.status == TaskStatus.RUNNING)
self.completed_tasks = sum(1 for t in tasks if t.status == TaskStatus.COMPLETED)
self.failed_tasks = sum(1 for t in tasks if t.status == TaskStatus.FAILED)
self.cancelled_tasks = sum(1 for t in tasks if t.status == TaskStatus.CANCELLED)
# Calculate success rate
finished_tasks = self.completed_tasks + self.failed_tasks + self.cancelled_tasks
if finished_tasks > 0:
self.success_rate = self.completed_tasks / finished_tasks
# Calculate average execution time
completed_with_results = [t for t in tasks if t.result and t.result.execution_time > 0]
if completed_with_results:
self.average_execution_time = sum(t.result.execution_time for t in completed_with_results) / len(completed_with_results)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
'total_tasks': self.total_tasks,
'pending_tasks': self.pending_tasks,
'running_tasks': self.running_tasks,
'completed_tasks': self.completed_tasks,
'failed_tasks': self.failed_tasks,
'cancelled_tasks': self.cancelled_tasks,
'average_execution_time': self.average_execution_time,
'success_rate': self.success_rate
}