Skip to main content
Glama

Katamari MCP Server

by ciphernaut
models.py8.37 kB
""" TaskMaster data models for stateful task management. """ from enum import Enum from dataclasses import dataclass, field from datetime import datetime from typing import Dict, Any, Optional, List import uuid class TaskStatus(Enum): """Task lifecycle status.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" PAUSED = "paused" class TaskType(Enum): """Task execution types.""" ONESHOT = "oneshot" # Single execution RECURRING = "recurring" # Scheduled recurring WORKFLOW = "workflow" # Multi-step workflow INTERACTIVE = "interactive" # Requires user input class TaskPriority(Enum): """Task execution priority.""" LOW = 1 NORMAL = 5 HIGH = 8 CRITICAL = 10 @dataclass class TaskResult: """Result of task execution.""" task_id: str status: TaskStatus result: Optional[Any] = None error: Optional[str] = None execution_time: float = 0.0 output: Optional[str] = None artifacts: Dict[str, Any] = field(default_factory=dict) created_at: datetime = field(default_factory=datetime.now) completed_at: Optional[datetime] = None @dataclass class Task: """Background task definition.""" task_id: str = field(default_factory=lambda: str(uuid.uuid4())) name: str = "" description: str = "" task_type: TaskType = TaskType.ONESHOT priority: TaskPriority = TaskPriority.NORMAL # Execution details command: str = "" parameters: Dict[str, Any] = field(default_factory=dict) capability: Optional[str] = None # Scheduling scheduled_at: Optional[datetime] = None timeout: Optional[float] = None max_retries: int = 3 retry_count: int = 0 # Dependencies depends_on: List[str] = field(default_factory=list) blocks: List[str] = field(default_factory=list) # State tracking status: TaskStatus = TaskStatus.PENDING progress: float = 0.0 created_at: datetime = field(default_factory=datetime.now) started_at: Optional[datetime] = None completed_at: Optional[datetime] = None # Results and logs result: Optional[TaskResult] = None logs: List[str] = field(default_factory=list) error_message: Optional[str] = None # Configuration auto_retry: bool = True notify_on_completion: bool = True persistent: bool = True def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" return { 'task_id': self.task_id, 'name': self.name, 'description': self.description, 'task_type': self.task_type.value, 'priority': self.priority.value, 'command': self.command, 'parameters': self.parameters, 'capability': self.capability, 'scheduled_at': self.scheduled_at.isoformat() if self.scheduled_at else None, 'timeout': self.timeout, 'max_retries': self.max_retries, 'retry_count': self.retry_count, 'depends_on': self.depends_on, 'blocks': self.blocks, 'status': self.status.value, 'progress': self.progress, 'created_at': self.created_at.isoformat(), 'started_at': self.started_at.isoformat() if self.started_at else None, 'completed_at': self.completed_at.isoformat() if self.completed_at else None, 'result': self.result.to_dict() if self.result else None, 'logs': self.logs, 'error_message': self.error_message, 'auto_retry': self.auto_retry, 'notify_on_completion': self.notify_on_completion, 'persistent': self.persistent } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'Task': """Create from dictionary representation.""" task = cls() task.task_id = data.get('task_id', task.task_id) task.name = data.get('name', '') task.description = data.get('description', '') task.task_type = TaskType(data.get('task_type', TaskType.ONESHOT.value)) task.priority = TaskPriority(data.get('priority', TaskPriority.NORMAL.value)) task.command = data.get('command', '') task.parameters = data.get('parameters', {}) task.capability = data.get('capability') # Dates if data.get('scheduled_at'): task.scheduled_at = datetime.fromisoformat(data['scheduled_at']) if data.get('started_at'): task.started_at = datetime.fromisoformat(data['started_at']) if data.get('completed_at'): task.completed_at = datetime.fromisoformat(data['completed_at']) if data.get('created_at'): task.created_at = datetime.fromisoformat(data['created_at']) # Other fields task.timeout = data.get('timeout') task.max_retries = data.get('max_retries', 3) task.retry_count = data.get('retry_count', 0) task.depends_on = data.get('depends_on', []) task.blocks = data.get('blocks', []) task.status = TaskStatus(data.get('status', TaskStatus.PENDING.value)) task.progress = data.get('progress', 0.0) task.logs = data.get('logs', []) task.error_message = data.get('error_message') task.auto_retry = data.get('auto_retry', True) task.notify_on_completion = data.get('notify_on_completion', True) task.persistent = data.get('persistent', True) return task @dataclass class TaskDependency: """Task dependency relationship.""" task_id: str depends_on: str dependency_type: str = "completion" # completion, success, failure def is_satisfied(self, tasks: Dict[str, Task]) -> bool: """Check if dependency is satisfied.""" if self.depends_on not in tasks: return False dep_task = tasks[self.depends_on] if self.dependency_type == "completion": return dep_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED] elif self.dependency_type == "success": return dep_task.status == TaskStatus.COMPLETED elif self.dependency_type == "failure": return dep_task.status == TaskStatus.FAILED return False @dataclass class TaskMetrics: """Task execution metrics.""" total_tasks: int = 0 pending_tasks: int = 0 running_tasks: int = 0 completed_tasks: int = 0 failed_tasks: int = 0 cancelled_tasks: int = 0 average_execution_time: float = 0.0 success_rate: float = 0.0 def update_from_tasks(self, tasks: List[Task]): """Update metrics from task list.""" self.total_tasks = len(tasks) self.pending_tasks = sum(1 for t in tasks if t.status == TaskStatus.PENDING) self.running_tasks = sum(1 for t in tasks if t.status == TaskStatus.RUNNING) self.completed_tasks = sum(1 for t in tasks if t.status == TaskStatus.COMPLETED) self.failed_tasks = sum(1 for t in tasks if t.status == TaskStatus.FAILED) self.cancelled_tasks = sum(1 for t in tasks if t.status == TaskStatus.CANCELLED) # Calculate success rate finished_tasks = self.completed_tasks + self.failed_tasks + self.cancelled_tasks if finished_tasks > 0: self.success_rate = self.completed_tasks / finished_tasks # Calculate average execution time completed_with_results = [t for t in tasks if t.result and t.result.execution_time > 0] if completed_with_results: self.average_execution_time = sum(t.result.execution_time for t in completed_with_results) / len(completed_with_results) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { 'total_tasks': self.total_tasks, 'pending_tasks': self.pending_tasks, 'running_tasks': self.running_tasks, 'completed_tasks': self.completed_tasks, 'failed_tasks': self.failed_tasks, 'cancelled_tasks': self.cancelled_tasks, 'average_execution_time': self.average_execution_time, 'success_rate': self.success_rate }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server