Skip to main content
Glama
architect.py5.39 kB
""" Workflow Architect Service - The Constructor Handles creation, updating, and reading workflow structures with Smart Upsert. """ import json from typing import List, Dict, Any, Optional, Union from app.core.client import get_client, safe_tool from app.core.config import settings from app.core.logging import architect_logger as logger def _parse_json_safe(data: Union[str, List, Dict], field_name: str) -> Union[List, Dict]: """ Smart parser: accepts both JSON strings and native Python objects. Provides detailed error messages on parse failure. """ if isinstance(data, (list, dict)): return data try: return json.loads(data) except json.JSONDecodeError as e: raise ValueError( f"Invalid JSON in '{field_name}': {e.msg} at line {e.lineno}, column {e.colno}. " f"Character position: {e.pos}. Content preview: '{data[max(0, e.pos-20):e.pos+20]}...'" ) async def _find_workflow_by_name(name: str) -> Optional[str]: """ Search for a workflow by name and return its ID if found. Used for Smart Upsert logic. """ client = get_client() data = await client.get("/workflows") workflows = data.get("data", []) for wf in workflows: if wf.get("name", "").lower() == name.lower(): logger.info(f"Found existing workflow '{name}' with ID: {wf['id']}") return wf["id"] return None @safe_tool async def read_workflow_structure(workflow_id: str) -> str: """ Get the full JSON structure (nodes & connections) of a workflow. Vital for reverse-engineering or analyzing existing bots. Args: workflow_id: The ID of the workflow to read. Returns: JSON string with the complete workflow specification. """ logger.info(f"Reading workflow structure: {workflow_id}") client = get_client() data = await client.get(f"/workflows/{workflow_id}") return json.dumps(data, indent=2) @safe_tool async def deploy_workflow( name: str, nodes: Union[str, List[Dict[str, Any]]], connections: Union[str, Dict[str, Any]], activate: bool = False ) -> str: """ Smart Upsert: Create or Update a workflow. - If a workflow with the same name exists, UPDATE it. - If not, CREATE a new one. Args: name: Name of the workflow. nodes: List of node definitions (can be JSON string or list). connections: Connection mapping between nodes (can be JSON string or dict). activate: Whether to activate the workflow after deployment. Returns: JSON string with deployment result including browser URL. """ # Smart parsing with detailed error messages parsed_nodes = _parse_json_safe(nodes, "nodes") parsed_connections = _parse_json_safe(connections, "connections") # Validation if not parsed_nodes or not isinstance(parsed_nodes, list): raise ValueError("Nodes must be a non-empty list of node definitions.") logger.info(f"Deploying workflow: '{name}' with {len(parsed_nodes)} nodes") # Smart Upsert: Check if workflow exists existing_id = await _find_workflow_by_name(name) payload = { "name": name, "nodes": parsed_nodes, "connections": parsed_connections, "settings": { "saveManualExecutions": True, "saveExecutionProgress": True } } client = get_client() if existing_id: # UPDATE existing workflow logger.info(f"Updating existing workflow {existing_id}") data = await client.put(f"/workflows/{existing_id}", json_data=payload) action = "updated" workflow_id = existing_id else: # CREATE new workflow logger.info("Creating new workflow") data = await client.post("/workflows", json_data=payload) action = "created" workflow_id = data["id"] # Activate if requested if activate: logger.info(f"Activating workflow {workflow_id}") await client.post(f"/workflows/{workflow_id}/activate") # Build browser URL editor_url = f"{settings.n8n_editor_url}/workflow/{workflow_id}" result = { "status": "success", "action": action, "id": workflow_id, "name": data.get("name", name), "active": activate, "editor_url": editor_url, "node_count": len(parsed_nodes) } logger.info(f"Workflow {action}: {workflow_id} → {editor_url}") return json.dumps(result, indent=2) @safe_tool async def clone_workflow(source_id: str, new_name: str, activate: bool = False) -> str: """ Clone an existing workflow with a new name. Args: source_id: ID of the workflow to clone. new_name: Name for the cloned workflow. activate: Whether to activate the cloned workflow. Returns: JSON string with the new workflow details. """ logger.info(f"Cloning workflow {source_id} as '{new_name}'") # Read source workflow client = get_client() source = await client.get(f"/workflows/{source_id}") # Deploy as new workflow return await deploy_workflow( name=new_name, nodes=source.get("nodes", []), connections=source.get("connections", {}), activate=activate )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SrAndres629/n8n_dev_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server