Skip to main content
Glama

MCP Development Server

by dillip285
workflow.py9.32 kB
"""Development workflow management for environments.""" from typing import Dict, List, Optional, Any, Callable from enum import Enum import asyncio from ..utils.logging import setup_logging from ..utils.errors import WorkflowError logger = setup_logging(__name__) class TaskStatus(str, Enum): """Workflow task status.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" class Task: """Represents a workflow task.""" def __init__( self, name: str, command: str, environment: str, dependencies: Optional[List[str]] = None, timeout: Optional[int] = None, retry_count: int = 0, on_success: Optional[Callable] = None, on_failure: Optional[Callable] = None ): self.name = name self.command = command self.environment = environment self.dependencies = dependencies or [] self.timeout = timeout self.retry_count = retry_count self.status = TaskStatus.PENDING self.result: Optional[Dict[str, Any]] = None self.on_success = on_success self.on_failure = on_failure self.attempts = 0 class Workflow: """Manages development workflows.""" def __init__(self, env_manager): self.env_manager = env_manager self.tasks: Dict[str, Task] = {} self.running = False def add_task(self, task: Task) -> None: """Add a task to the workflow.""" self.tasks[task.name] = task def remove_task(self, task_name: str) -> None: """Remove a task from the workflow.""" if task_name in self.tasks: del self.tasks[task_name] async def execute(self) -> Dict[str, Any]: """Execute the workflow.""" try: self.running = True results = {} # Build dependency graph graph = self._build_dependency_graph() # Execute tasks in order for task_group in graph: # Execute tasks in group concurrently tasks = [self._execute_task(task_name) for task_name in task_group] group_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for task_name, result in zip(task_group, group_results): if isinstance(result, Exception): self.tasks[task_name].status = TaskStatus.FAILED results[task_name] = { "status": TaskStatus.FAILED, "error": str(result) } else: results[task_name] = result return results except Exception as e: raise WorkflowError(f"Workflow execution failed: {str(e)}") finally: self.running = False async def _execute_task(self, task_name: str) -> Dict[str, Any]: """Execute a single task.""" task = self.tasks[task_name] # Check dependencies for dep in task.dependencies: dep_task = self.tasks.get(dep) if not dep_task or dep_task.status != TaskStatus.COMPLETED: task.status = TaskStatus.SKIPPED return { "status": TaskStatus.SKIPPED, "reason": f"Dependency {dep} not satisfied" } task.status = TaskStatus.RUNNING task.attempts += 1 try: # Execute the command result = await asyncio.wait_for( self.env_manager.execute_in_environment( task.environment, task.command ), timeout=task.timeout ) # Handle execution result if result['exit_code'] == 0: task.status = TaskStatus.COMPLETED if task.on_success: await task.on_success(result) return { "status": TaskStatus.COMPLETED, "result": result } else: # Handle retry logic if task.attempts < task.retry_count + 1: logger.info(f"Retrying task {task_name} (attempt {task.attempts})") return await self._execute_task(task_name) task.status = TaskStatus.FAILED if task.on_failure: await task.on_failure(result) return { "status": TaskStatus.FAILED, "result": result } except asyncio.TimeoutError: task.status = TaskStatus.FAILED return { "status": TaskStatus.FAILED, "error": "Task timeout" } except Exception as e: task.status = TaskStatus.FAILED return { "status": TaskStatus.FAILED, "error": str(e) } def _build_dependency_graph(self) -> List[List[str]]: """Build ordered list of task groups based on dependencies.""" # Initialize variables graph: List[List[str]] = [] completed = set() remaining = set(self.tasks.keys()) while remaining: # Find tasks with satisfied dependencies group = set() for task_name in remaining: task = self.tasks[task_name] if all(dep in completed for dep in task.dependencies): group.add(task_name) if not group: # Circular dependency detected raise WorkflowError("Circular dependency detected in workflow") # Add group to graph graph.append(list(group)) completed.update(group) remaining.difference_update(group) return graph def get_status(self) -> Dict[str, Any]: """Get workflow status.""" return { "running": self.running, "tasks": { name: { "status": task.status, "attempts": task.attempts, "dependencies": task.dependencies } for name, task in self.tasks.items() } } def reset(self) -> None: """Reset workflow state.""" for task in self.tasks.values(): task.status = TaskStatus.PENDING task.attempts = 0 task.result = None self.running = False # Example workflow definitions for common development tasks class CommonWorkflows: """Predefined development workflows.""" @staticmethod def create_build_workflow(env_manager, environment: str) -> Workflow: """Create a standard build workflow.""" workflow = Workflow(env_manager) # Install dependencies workflow.add_task(Task( name="install_deps", command="npm install", environment=environment, retry_count=2 )) # Run linter workflow.add_task(Task( name="lint", command="npm run lint", environment=environment, dependencies=["install_deps"] )) # Run tests workflow.add_task(Task( name="test", command="npm run test", environment=environment, dependencies=["install_deps"] )) # Build workflow.add_task(Task( name="build", command="npm run build", environment=environment, dependencies=["lint", "test"] )) return workflow @staticmethod def create_test_workflow(env_manager, environment: str) -> Workflow: """Create a standard test workflow.""" workflow = Workflow(env_manager) # Install test dependencies workflow.add_task(Task( name="install_test_deps", command="npm install --only=dev", environment=environment, retry_count=2 )) # Run unit tests workflow.add_task(Task( name="unit_tests", command="npm run test:unit", environment=environment, dependencies=["install_test_deps"] )) # Run integration tests workflow.add_task(Task( name="integration_tests", command="npm run test:integration", environment=environment, dependencies=["install_test_deps"] )) # Generate coverage report workflow.add_task(Task( name="coverage", command="npm run coverage", environment=environment, dependencies=["unit_tests", "integration_tests"] )) return workflow

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dillip285/mcp-dev-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server