MCP Development Server
by dillip285
- src
- mcp_dev_server
- environments
"""Development workflow management for environments."""
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import asyncio
from ..utils.logging import setup_logging
from ..utils.errors import WorkflowError
logger = setup_logging(__name__)
class TaskStatus(str, Enum):
"""Workflow task status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class Task:
"""Represents a workflow task."""
def __init__(
self,
name: str,
command: str,
environment: str,
dependencies: Optional[List[str]] = None,
timeout: Optional[int] = None,
retry_count: int = 0,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None
):
self.name = name
self.command = command
self.environment = environment
self.dependencies = dependencies or []
self.timeout = timeout
self.retry_count = retry_count
self.status = TaskStatus.PENDING
self.result: Optional[Dict[str, Any]] = None
self.on_success = on_success
self.on_failure = on_failure
self.attempts = 0
class Workflow:
"""Manages development workflows."""
def __init__(self, env_manager):
self.env_manager = env_manager
self.tasks: Dict[str, Task] = {}
self.running = False
def add_task(self, task: Task) -> None:
"""Add a task to the workflow."""
self.tasks[task.name] = task
def remove_task(self, task_name: str) -> None:
"""Remove a task from the workflow."""
if task_name in self.tasks:
del self.tasks[task_name]
async def execute(self) -> Dict[str, Any]:
"""Execute the workflow."""
try:
self.running = True
results = {}
# Build dependency graph
graph = self._build_dependency_graph()
# Execute tasks in order
for task_group in graph:
# Execute tasks in group concurrently
tasks = [self._execute_task(task_name) for task_name in task_group]
group_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for task_name, result in zip(task_group, group_results):
if isinstance(result, Exception):
self.tasks[task_name].status = TaskStatus.FAILED
results[task_name] = {
"status": TaskStatus.FAILED,
"error": str(result)
}
else:
results[task_name] = result
return results
except Exception as e:
raise WorkflowError(f"Workflow execution failed: {str(e)}")
finally:
self.running = False
async def _execute_task(self, task_name: str) -> Dict[str, Any]:
"""Execute a single task."""
task = self.tasks[task_name]
# Check dependencies
for dep in task.dependencies:
dep_task = self.tasks.get(dep)
if not dep_task or dep_task.status != TaskStatus.COMPLETED:
task.status = TaskStatus.SKIPPED
return {
"status": TaskStatus.SKIPPED,
"reason": f"Dependency {dep} not satisfied"
}
task.status = TaskStatus.RUNNING
task.attempts += 1
try:
# Execute the command
result = await asyncio.wait_for(
self.env_manager.execute_in_environment(
task.environment,
task.command
),
timeout=task.timeout
)
# Handle execution result
if result['exit_code'] == 0:
task.status = TaskStatus.COMPLETED
if task.on_success:
await task.on_success(result)
return {
"status": TaskStatus.COMPLETED,
"result": result
}
else:
# Handle retry logic
if task.attempts < task.retry_count + 1:
logger.info(f"Retrying task {task_name} (attempt {task.attempts})")
return await self._execute_task(task_name)
task.status = TaskStatus.FAILED
if task.on_failure:
await task.on_failure(result)
return {
"status": TaskStatus.FAILED,
"result": result
}
except asyncio.TimeoutError:
task.status = TaskStatus.FAILED
return {
"status": TaskStatus.FAILED,
"error": "Task timeout"
}
except Exception as e:
task.status = TaskStatus.FAILED
return {
"status": TaskStatus.FAILED,
"error": str(e)
}
def _build_dependency_graph(self) -> List[List[str]]:
"""Build ordered list of task groups based on dependencies."""
# Initialize variables
graph: List[List[str]] = []
completed = set()
remaining = set(self.tasks.keys())
while remaining:
# Find tasks with satisfied dependencies
group = set()
for task_name in remaining:
task = self.tasks[task_name]
if all(dep in completed for dep in task.dependencies):
group.add(task_name)
if not group:
# Circular dependency detected
raise WorkflowError("Circular dependency detected in workflow")
# Add group to graph
graph.append(list(group))
completed.update(group)
remaining.difference_update(group)
return graph
def get_status(self) -> Dict[str, Any]:
"""Get workflow status."""
return {
"running": self.running,
"tasks": {
name: {
"status": task.status,
"attempts": task.attempts,
"dependencies": task.dependencies
}
for name, task in self.tasks.items()
}
}
def reset(self) -> None:
"""Reset workflow state."""
for task in self.tasks.values():
task.status = TaskStatus.PENDING
task.attempts = 0
task.result = None
self.running = False
# Example workflow definitions for common development tasks
class CommonWorkflows:
"""Predefined development workflows."""
@staticmethod
def create_build_workflow(env_manager, environment: str) -> Workflow:
"""Create a standard build workflow."""
workflow = Workflow(env_manager)
# Install dependencies
workflow.add_task(Task(
name="install_deps",
command="npm install",
environment=environment,
retry_count=2
))
# Run linter
workflow.add_task(Task(
name="lint",
command="npm run lint",
environment=environment,
dependencies=["install_deps"]
))
# Run tests
workflow.add_task(Task(
name="test",
command="npm run test",
environment=environment,
dependencies=["install_deps"]
))
# Build
workflow.add_task(Task(
name="build",
command="npm run build",
environment=environment,
dependencies=["lint", "test"]
))
return workflow
@staticmethod
def create_test_workflow(env_manager, environment: str) -> Workflow:
"""Create a standard test workflow."""
workflow = Workflow(env_manager)
# Install test dependencies
workflow.add_task(Task(
name="install_test_deps",
command="npm install --only=dev",
environment=environment,
retry_count=2
))
# Run unit tests
workflow.add_task(Task(
name="unit_tests",
command="npm run test:unit",
environment=environment,
dependencies=["install_test_deps"]
))
# Run integration tests
workflow.add_task(Task(
name="integration_tests",
command="npm run test:integration",
environment=environment,
dependencies=["install_test_deps"]
))
# Generate coverage report
workflow.add_task(Task(
name="coverage",
command="npm run coverage",
environment=environment,
dependencies=["unit_tests", "integration_tests"]
))
return workflow