"""Workflow service for robotics-webapp backend."""
import logging
from typing import Any
from mcp_client import mcp_client
logger = logging.getLogger(__name__)
class WorkflowService:
"""Service for managing workflows via MCP client."""
def __init__(self):
"""Initialize workflow service."""
self.mcp_client = mcp_client
async def create_workflow(self, workflow_data: dict[str, Any]) -> dict[str, Any]:
"""Create a new workflow.
Args:
workflow_data: Workflow definition.
Returns:
Created workflow data.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "create",
"workflow_data": workflow_data,
},
)
return result.get("data", {})
except Exception as e:
logger.error(f"Failed to create workflow: {e}", exc_info=True)
raise
async def get_workflow(self, workflow_id: str) -> dict[str, Any]:
"""Get workflow by ID.
Args:
workflow_id: Workflow identifier.
Returns:
Workflow data.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "read",
"workflow_id": workflow_id,
},
)
return result.get("data", {}).get("workflow", {})
except Exception as e:
logger.error(f"Failed to get workflow: {e}", exc_info=True)
raise
async def update_workflow(
self, workflow_id: str, workflow_data: dict[str, Any]
) -> dict[str, Any]:
"""Update workflow.
Args:
workflow_id: Workflow identifier.
workflow_data: Updated workflow definition.
Returns:
Updated workflow data.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "update",
"workflow_id": workflow_id,
"workflow_data": workflow_data,
},
)
return result.get("data", {}).get("workflow", {})
except Exception as e:
logger.error(f"Failed to update workflow: {e}", exc_info=True)
raise
async def delete_workflow(self, workflow_id: str) -> dict[str, Any]:
"""Delete workflow.
Args:
workflow_id: Workflow identifier.
Returns:
Deletion result.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "delete",
"workflow_id": workflow_id,
},
)
return result.get("data", {})
except Exception as e:
logger.error(f"Failed to delete workflow: {e}", exc_info=True)
raise
async def list_workflows(
self,
category: str | None = None,
tags: list[str] | None = None,
search: str | None = None,
) -> list[dict[str, Any]]:
"""List workflows with optional filtering.
Args:
category: Filter by category.
tags: Filter by tags.
search: Search query.
Returns:
List of workflows.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "list",
"category": category,
"tags": tags,
"search": search,
},
)
return result.get("data", {}).get("workflows", [])
except Exception as e:
logger.error(f"Failed to list workflows: {e}", exc_info=True)
raise
async def execute_workflow(
self, workflow_id: str, variables: dict[str, Any], debug_mode: bool = False
) -> dict[str, Any]:
"""Execute workflow.
Args:
workflow_id: Workflow identifier.
variables: Execution variables.
debug_mode: Enable step-by-step debugging mode.
Returns:
Execution result with execution_id.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "execute",
"workflow_id": workflow_id,
"variables": variables,
"debug_mode": debug_mode,
},
)
return result.get("data", {})
except Exception as e:
logger.error(f"Failed to execute workflow: {e}", exc_info=True)
raise
async def pause_execution(self, execution_id: str) -> None:
"""Pause workflow execution.
Args:
execution_id: Execution identifier.
"""
try:
await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "pause",
"execution_id": execution_id,
},
)
except Exception as e:
logger.error(f"Failed to pause execution: {e}", exc_info=True)
raise
async def resume_execution(self, execution_id: str) -> None:
"""Resume workflow execution.
Args:
execution_id: Execution identifier.
"""
try:
await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "resume",
"execution_id": execution_id,
},
)
except Exception as e:
logger.error(f"Failed to resume execution: {e}", exc_info=True)
raise
async def step_execution(self, execution_id: str) -> None:
"""Step to next instruction in debug mode.
Args:
execution_id: Execution identifier.
"""
try:
await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "step",
"execution_id": execution_id,
},
)
except Exception as e:
logger.error(f"Failed to step execution: {e}", exc_info=True)
raise
async def continue_execution(self, execution_id: str) -> None:
"""Continue execution from breakpoint.
Args:
execution_id: Execution identifier.
"""
try:
await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "continue",
"execution_id": execution_id,
},
)
except Exception as e:
logger.error(f"Failed to continue execution: {e}", exc_info=True)
raise
async def get_execution_status(self, execution_id: str) -> dict[str, Any]:
"""Get workflow execution status.
Args:
execution_id: Execution identifier.
Returns:
Execution status data.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "status",
"execution_id": execution_id,
},
)
return result.get("data", {}).get("execution", {})
except Exception as e:
logger.error(f"Failed to get execution status: {e}", exc_info=True)
raise
async def get_templates(self) -> list[dict[str, Any]]:
"""Get workflow templates.
Returns:
List of workflow templates.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "templates",
},
)
return result.get("data", {}).get("templates", [])
except Exception as e:
logger.error(f"Failed to get templates: {e}", exc_info=True)
raise
async def import_workflow(self, workflow_data: dict[str, Any]) -> dict[str, Any]:
"""Import workflow from JSON.
Args:
workflow_data: Workflow definition.
Returns:
Imported workflow data.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "import",
"workflow_data": workflow_data,
},
)
return result.get("data", {}).get("workflow", {})
except Exception as e:
logger.error(f"Failed to import workflow: {e}", exc_info=True)
raise
async def export_workflow(self, workflow_id: str) -> dict[str, Any]:
"""Export workflow to JSON.
Args:
workflow_id: Workflow identifier.
Returns:
Exported workflow data.
"""
try:
result = await self.mcp_client.call_tool(
"robotics",
"workflow_management",
{
"operation": "export",
"workflow_id": workflow_id,
},
)
return result.get("data", {})
except Exception as e:
logger.error(f"Failed to export workflow: {e}", exc_info=True)
raise
# Singleton instance
workflow_service = WorkflowService()