Skip to main content
Glama
orchestrate_auto.py23.7 kB
""" OrchestrateAuto tool - v0.2 lightweight multi-tool orchestrator Goal: Given a natural prompt, infer and execute a small plan using existing Zen tools with plug-and-play defaults. This MVP focuses on orchestrating the Analyze workflow in two steps, automatically filling required fields (model, step metadata, files). Future iterations will support tool-introspection, model-driven planning, and multi-tool routing (codereview, testgen, tracer, etc.). """ from __future__ import annotations import os import json import logging from typing import Any, Optional, Literal from pydantic import Field from tools.shared.base_tool import BaseTool from tools.shared.base_models import ToolRequest from tools.registry import ToolRegistry logger = logging.getLogger(__name__) class OrchestrateRequest(ToolRequest): """Request model for OrchestrateAuto tool (v0.2) Fields: - user_prompt: free-form instruction describing what to do - relevant_files: optional absolute paths; if missing, basic heuristics infer a small set - step_budget: max steps to execute (v0.2 supports 1-3) - dry_run: if true, return the plan only (no execution) """ user_prompt: str = Field(..., description="High-level instruction. The orchestrator infers steps and tools.") tool: Literal["analyze", "codereview", "debug", "testgen", "refactor", "tracer", "secaudit", "precommit", "chat", "kimi_multi_file_chat"] | None = Field( default=None, description="Optional tool to run (default inferred). When omitted, orchestrator picks based on prompt.", ) relevant_files: list[str] | None = Field( default=None, description="Optional absolute paths to relevant files. If not provided, the tool infers a small set.", ) step_budget: int = Field( default=2, description="Max steps to execute. v0.2 supports 1-3 steps depending on intent." ) # Optional pass-throughs for analyze analysis_type: Literal["architecture", "performance", "security", "quality", "general"] | None = Field( default=None, description="Optional analysis type to pass through to analyze (default general)." ) output_format: Literal["summary", "detailed", "actionable"] | None = Field( default=None, description="Optional output format to pass through to analyze (default step-based)." ) # Optional pass-throughs for codereview review_type: Literal["full", "security", "performance", "quick"] | None = Field( default=None, description="Optional codereview review_type when tool='codereview' (default full)." ) focus_on: Optional[str] = Field( default=None, description="Optional codereview focus area when tool='codereview'." ) dry_run: bool = Field(default=False, description="If true, plan only; do not execute underlying tools.") class OrchestrateAutoTool(BaseTool): """v0.2 Orchestrator that infers intent and executes a small multi-step plan. Behavior: - Builds a minimal plan to run the inferred tool for up to 3 steps - Auto-fills model and file context; auto-enables websearch when useful - Emits consolidated summary and guidance """ def get_name(self) -> str: return "orchestrate_auto" def get_description(self) -> str: return ( "AUTO-ORCHESTRATOR v0.2 - Given a natural prompt, infer intent, create a short plan, and execute " "tools with required fields auto-filled. Supports multi-tool routing (lightweight heuristics), " "up to 3 steps, auto websearch opt-in, and file-extraction pre-steps when appropriate." ) def get_system_prompt(self) -> str: # Not used for MVP (we do not call an LLM within this tool yet) return "" def requires_model(self) -> bool: # The tool itself does not call a model directly in the MVP return False def get_request_model(self): return OrchestrateRequest def get_input_schema(self) -> dict[str, Any]: # Provide optional model for downstream tools; respect auto-mode requirements return { "type": "object", "properties": { "user_prompt": { "type": "string", "description": "High-level instruction. The orchestrator infers steps and tools.", }, "relevant_files": { "type": "array", "items": {"type": "string"}, "description": "Optional absolute paths to relevant files. If not provided, the tool infers a small set.", }, "step_budget": { "type": "integer", "minimum": 1, "maximum": 3, "default": 2, "description": "Max steps to execute (1-3). v0.2 enables up to 3 with actionable step.", }, "dry_run": { "type": "boolean", "default": False, "description": "If true, plan only; do not execute underlying tools.", }, # Optional pass-throughs for analyze tool "analysis_type": { "type": "string", "enum": ["architecture", "performance", "security", "quality", "general"], "description": "Optional analysis type to pass through to analyze (default general)", }, "output_format": { "type": "string", "enum": ["summary", "detailed", "actionable"], "description": "Optional output format to pass through to analyze (default step-based)", }, # Let callers optionally override model; tool will fallback to env default "model": self.get_model_field_schema(), "continuation_id": { "type": "string", "description": "Continuation ID for conversation carry-over across orchestrated steps.", }, }, "required": ["user_prompt"] + (["model"] if self.is_effective_auto_mode() else []), } async def prepare_prompt(self, request) -> str: """Satisfy BaseTool's abstract interface. MVP orchestrator does not call a model directly; return a minimal synthesized string using the provided user prompt when available. """ try: user_prompt = getattr(request, "user_prompt", None) if user_prompt is None and isinstance(request, dict): user_prompt = request.get("user_prompt") except Exception: user_prompt = None user_prompt = (user_prompt or "").strip() return f"OrchestrateAuto v0.2 request: {user_prompt}" if user_prompt else "OrchestrateAuto v0.2 request" # --------------------- # Execution # --------------------- async def execute(self, arguments: dict[str, Any]) -> list: """Plan and (optionally) execute a two-step analyze workflow. Returns a single consolidated text content describing the plan, any execution results, and suggested next steps. """ # Validate request RequestModel = self.get_request_model() try: req = RequestModel.model_validate(arguments) except Exception as e: return [ { "type": "text", "text": f"Invalid request for orchestrate_auto: {type(e).__name__}: {e}", } ] # Resolve model for downstream tools model_name = arguments.get("model") or os.getenv("ORCHESTRATOR_DEFAULT_MODEL") or "glm-4.5-flash" # Infer relevant files if not provided relevant_files = self._ensure_abs_paths(req.relevant_files) if req.relevant_files else self._infer_relevant_files() # Descriptor-driven validation of first-step requirements for selected tool selected_tool = (arguments.get("tool") or req.tool or "analyze").strip().lower() registry = ToolRegistry() registry.build_tools() descriptors = {} try: descriptors = registry.list_descriptors() except Exception: descriptors = {} first_step_reqs = [] try: first_step_reqs = ( descriptors.get(selected_tool, {}) .get("annotations", {}) .get("first_step_required_fields", []) ) except Exception: first_step_reqs = [] # Validate minimally required context for first step based on descriptor if "relevant_files" in first_step_reqs and not relevant_files: guidance = ( "OrchestrateAuto could not infer any relevant files. Please provide at least one FULL absolute path " "to a file or directory under your repository in the 'relevant_files' parameter. Example: \n" f" - {os.path.join(os.getcwd(), 'zen-mcp-server', 'server.py')}\n" f" - {os.path.join(os.getcwd(), 'zen-mcp-server')} (directory)" ) return [{"type": "text", "text": guidance}] # Select tool: explicit or heuristic (v0.2) selected_tool = (arguments.get("tool") or req.tool or "").strip().lower() if not selected_tool: selected_tool = self._infer_tool_from_prompt(req.user_prompt) if selected_tool not in {"analyze", "codereview", "debug", "testgen", "refactor", "tracer", "secaudit", "precommit", "chat"}: selected_tool = "analyze" # Descriptors already loaded above; reuse for plan construction # (avoid redundant registry.build_tools()/list_descriptors() calls) # Build simple plan: run selected workflow (multi-tool aware) for up to 3 steps plan = [ { "tool": selected_tool, "step_number": 1, "description": f"Kick off {selected_tool} with specified/inferred files. Report goals and plan.", } ] if req.step_budget >= 2: plan.append( { "tool": selected_tool, "step_number": 2, "description": f"Summarize concrete findings and early recommendations ({selected_tool}).", } ) if req.step_budget >= 3 and selected_tool in {"analyze", "codereview", "secaudit", "debug"}: plan.append( { "tool": selected_tool, "step_number": 3, "description": f"Actionable next steps or quick fixes ({selected_tool}).", } ) # Dry run: return only plan preview (with basic metadata) if req.dry_run: return [ { "type": "text", "text": self._format_plan_only(req.user_prompt, model_name, relevant_files, plan), "status": "plan_preview", "next_steps": "Run with dry_run=false to execute the plan.", "continuation_id": arguments.get("continuation_id"), } ] # Auto pre-step: file extract via Kimi if prompt implies and allowed if self._should_do_file_extract(req.user_prompt, relevant_files): try: from tools.kimi_upload import KimiUploadAndExtractTool up = KimiUploadAndExtractTool() # As we don't have paths yet, this is a guided message to user to attach files next # Keep it conservative: we only inform; do not block flow guidance_msg = "Consider using kimi_upload_and_extract to ingest your files; orchestrator detected a file-oriented request." except Exception: guidance_msg = None else: guidance_msg = None # Execute the plan via ToolRegistry registry = ToolRegistry() registry.build_tools() outputs: list[str] = [] guidance: list[str] = [] cont_id = arguments.get("continuation_id") for step in plan: tool_name = step["tool"] try: tool_impl = registry.get_tool(tool_name) except Exception as e: outputs.append(f"Failed to load tool '{tool_name}': {e}") break step_num = step["step_number"] total_steps = max(step_num, min(req.step_budget, 2)) # Prepare minimal valid payloads for workflow tools (analyze, codereview) if step_num == 1: step_text = f"Start {tool_name}: {req.user_prompt}" args = { "model": model_name, "step": step_text, "step_number": 1, "total_steps": total_steps, "next_step_required": total_steps > 1, "findings": "Plan initial investigation and identify target files.", "relevant_files": relevant_files, "files_checked": [], } # Analyze-specific pass-throughs if tool_name == "analyze": args["analysis_type"] = arguments.get("analysis_type") or "general" args["output_format"] = arguments.get("output_format") or "summary" # Codereview-specific optional pass-throughs if tool_name == "codereview": if arguments.get("review_type"): args["review_type"] = arguments["review_type"] if arguments.get("focus_on"): args["focus_on"] = arguments["focus_on"] else: # step 2 args = { "model": model_name, "step": f"Report concrete findings and recommendations based on investigation ({tool_name}).", "step_number": 2, "total_steps": total_steps, "next_step_required": False, "findings": ( "Summarized findings: initial goals reviewed; files examined; next actions identified." ), "relevant_files": relevant_files, "files_checked": relevant_files, } if tool_name == "analyze": args["analysis_type"] = arguments.get("analysis_type") or "general" args["output_format"] = arguments.get("output_format") or "actionable" if cont_id: args["continuation_id"] = cont_id # Execute workflow step try: # Auto-enable websearch if intent suggests if "use_websearch" not in args: args["use_websearch"] = self._should_enable_websearch(req.user_prompt) result_chunks = await tool_impl.execute(args) # Render full output for visibility rendered = self._render_chunks(result_chunks) outputs.append(f"[{tool_name} step {step_num}]\n{rendered}") if guidance_msg: outputs.append(f"[hint] {guidance_msg}") # Try to parse JSON response to extract continuation_id and next_steps try: # Use the first text chunk as JSON payload first_text = None if result_chunks: ch0 = result_chunks[0] if isinstance(ch0, dict): first_text = ch0.get("text") else: first_text = getattr(ch0, "text", None) if first_text: data = json.loads(first_text) # Capture guidance ns = data.get("next_steps") if ns: guidance.append(f"Step {step_num}: {ns}") # Update continuation id for subsequent steps when not provided initially cid = data.get("continuation_id") if cid: cont_id = cid except Exception: # Non-JSON or unexpected structure; skip extraction but continue pass except Exception as e: outputs.append(f"{tool_name} step {step_num} failed: {type(e).__name__}: {e}") break final_text = self._format_consolidated_output( req.user_prompt, model_name, relevant_files, plan, outputs, guidance ) return [{ "type": "text", "text": final_text, "status": "orchestration_complete", "next_steps": "Review results and consider next steps.", "continuation_id": cont_id, }] def _infer_tool_from_prompt(self, text: str) -> str: """Heuristic intent routing for v0.2 (no model call, safe). Returns one of: analyze, codereview, debug, testgen, refactor, tracer, secaudit, precommit, chat """ t = (text or "").lower() # Very lightweight patterns; prefer conservative routing to analyze if any(k in t for k in ["bug", "error", "traceback", "stacktrace", "failure", "repro"]): return "debug" if any(k in t for k in ["test", "unit test", "pytest", "coverage", "mock"]): return "testgen" if any(k in t for k in ["refactor", "rename", "extract method", "clean up", "cleanup"]): return "refactor" if any(k in t for k in ["security", "owasp", "vuln", "xss", "sql injection", "secrets"]): return "secaudit" if any(k in t for k in ["review", "code review", "diff", "pull request", "pr review"]): return "codereview" if any(k in t for k in ["trace", "call chain", "entry point", "dependencies"]): return "tracer" if any(k in t for k in ["precommit", "pre-commit", "pre commit", "commit checks"]): return "precommit" if any(k in t for k in ["chat", "ask", "explain", "summarize"]): return "chat" return "analyze" # --------------------- # Helpers # --------------------- def _should_enable_websearch(self, text: str) -> bool: t = (text or "").lower() if os.getenv("EX_WEBSEARCH_ENABLED", "true").lower() == "false": return False # Default on if EX_WEBSEARCH_DEFAULT_ON=true default_on = os.getenv("EX_WEBSEARCH_DEFAULT_ON", "false").lower() == "true" keywords = ["latest", "news", "docs", "documentation", "today", "current", "update", "release", "changelog", "how to", "api"] return default_on or any(k in t for k in keywords) def _should_do_file_extract(self, text: str, files: list[str] | None) -> bool: if files: return False t = (text or "").lower() return any(k in t for k in ["pdf", "doc", "docx", "ppt", "pptx", "xlsx", "csv", "upload", "file"]) def _ensure_abs_paths(self, paths: list[str]) -> list[str]: """Validate and normalize paths to absolute within project root.""" abs_list: list[str] = [] project_root = os.path.abspath(os.getcwd()) for p in paths: if not p: continue abs_p = os.path.abspath(p) # Restrict to project boundary and existing paths only if abs_p.startswith(project_root) and os.path.exists(abs_p): abs_list.append(abs_p) return abs_list def _infer_relevant_files(self) -> list[str]: """Very conservative file inference: prefer core Zen files if present. Returns up to 3 absolute paths. """ project_root = os.path.abspath(os.getcwd()) candidates = [ os.path.join(project_root, "zen-mcp-server", "server.py"), os.path.join(project_root, "zen-mcp-server", "pyproject.toml"), os.path.join(project_root, "examples", "end_to_end_demo.py"), ] inferred = [p for p in candidates if os.path.exists(p)] if inferred: return inferred[:3] # Fallback: first few Python files under zen-mcp-server (bounded to project root) root = os.path.join(project_root, "zen-mcp-server") picks: list[str] = [] for dirpath, _, filenames in os.walk(root): # Ensure we stay within project boundary if os.path.commonpath([project_root, dirpath]) != project_root: continue for fn in filenames: if fn.endswith(".py"): full = os.path.join(dirpath, fn) picks.append(full) if len(picks) >= 3: return picks return picks def _render_chunks(self, chunks: list) -> str: parts: list[str] = [] for ch in chunks or []: try: if isinstance(ch, dict) and ch.get("type") == "text": parts.append(str(ch.get("text", ""))) else: # Fallback for other content formats (e.g., TextContent) txt = getattr(ch, "text", None) parts.append(str(txt) if txt is not None else str(ch)) except Exception: parts.append(str(ch)) return "\n".join([p for p in parts if p]) def _format_plan_only(self, user_prompt: str, model_name: str, files: list[str], plan: list[dict]) -> str: plan_lines = [ "Planned steps (v0.2):", f"- {plan[0]['tool']} step 1 (files={len(files)})" if plan else "- analyze step 1", ] if any(s.get("step_number") == 2 for s in plan): plan_lines.append(f"- {plan[0]['tool']} step 2 (finalize)" if plan else "- analyze step 2 (finalize)") return ( f"OrchestrateAuto (v0.2)\nRequest: {user_prompt}\nModel: {model_name}\nFiles:\n" + "\n".join(f" - {f}" for f in files) + "\n\n" + "\n".join(plan_lines) ) def _format_consolidated_output( self, user_prompt: str, model_name: str, files: list[str], plan: list[dict], outputs: list[str], guidance: list[str] | None = None, ) -> str: header = [ "=== ORCHESTRATION SUMMARY (MVP) ===", f"Prompt: {user_prompt}", f"Model: {model_name}", "Files:", *[f" - {f}" for f in files], "", "Plan:", *[f" - {s['tool']} step {s['step_number']}: {s['description']}" for s in plan], "", "Execution Results:", ] body = [] for out in outputs: body.append(out) body.append("") # Include guidance extracted from analyze tool (next_steps / required actions) guidance = guidance or [] if guidance: body.append("Required Actions / Next Steps from Analyze:") body.extend([f"- {g}" for g in guidance]) body.append("") footer = [ "Next: Expand orchestrator to multi-tool routing (testgen, tracer) using descriptors,", "add model-driven planning and cost-aware tool selection.", ] return "\n".join(header + body + footer)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Zazzles2908/EX_AI-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server