"""
Auggie-optimized wrapper tools for Zen MCP Server.
These wrappers provide CLI-friendly output, progress indicators, error handling,
and plug into the registry's category-aware fallback and telemetry.
Wrappers are safe no-ops when Auggie config is not present. They attempt to
integrate with optional auggie.session and auggie.templates modules when available.
Functions (async):
- aug_chat(args: dict) -> str
- aug_thinkdeep(args: dict) -> str
- aug_consensus(args: dict) -> str
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any, Optional
import os
from src.providers.base import ModelResponse
from src.providers.registry import ModelProviderRegistry as Registry
from auggie.selector import select_model
from auggie.templates import render_template
try:
from auggie.config import get_auggie_settings
except Exception: # pragma: no cover
def get_auggie_settings():
return {}
logger = logging.getLogger(__name__)
# --- Optional session integration (file/redis-backed implemented in Phase 1 Step 4) ---
# Session integration
try:
from auggie.session import get_session as _sess_get, update_session as _sess_update, append_history as _sess_append
except Exception: # pragma: no cover
def _sess_get(sid: Optional[str]): return {}
def _sess_update(sid: Optional[str], **kv): return {}
def _sess_append(sid: Optional[str], entry: dict): return {}
# --- Optional templates integration ---
def _render_template_if_available(name: Optional[str], variables: dict[str, Any], default: str) -> str:
if not name:
return default
try:
from auggie.templates import render_template # type: ignore
return render_template(name, variables)
except Exception:
return default
# --- CLI formatting helpers ---
def _format_cli(data: dict[str, Any], compact: bool = True) -> str:
"""Format a small, structured CLI output."""
if compact:
# Compact: single JSON line for machine-friendly consumption
try:
return json.dumps(data, ensure_ascii=False)
except Exception:
pass
# Pretty text fallback
parts = []
if m := data.get("model"):
parts.append(f"[Model] {m}")
if p := data.get("provider"):
parts.append(f"[Provider] {p}")
if lat := data.get("latency_ms"):
parts.append(f"[Latency] {lat} ms")
if usage := data.get("usage"):
it = usage.get("input_tokens", 0)
ot = usage.get("output_tokens", 0)
tt = usage.get("total_tokens", it + ot)
parts.append(f"[Tokens] in={it} out={ot} total={tt}")
if pr := data.get("progress"):
parts.append("[Progress]\n- " + "\n- ".join(pr))
if err := data.get("error"):
parts.append(f"[Error] {err}")
if content := data.get("content"):
parts.append("[Output]\n" + content)
return "\n".join(parts)
def _get_wrapper_settings() -> dict[str, Any]:
s = get_auggie_settings() or {}
w = s.get("wrappers") or {}
return {
"show_progress": bool(w.get("show_progress", True)),
"compact_output": bool(w.get("compact_output", True)),
"error_detail": str(w.get("error_detail", "detailed")),
}
# --- Compact activity log streamer (optional for CLI/VS Code) ---
async def _stream_activity_tail(stop_event: asyncio.Event, lines: int = 80, pattern: Optional[str] = None) -> None:
"""Continuously tail the activity log and emit lines via progress helper.
Emits only new lines; filters with optional regex; stops when stop_event is set.
"""
try:
import re
from pathlib import Path
from utils.progress import send_progress
project_root = Path(__file__).resolve().parents[1]
log_path = (project_root / "logs" / "mcp_activity.log").resolve()
if not log_path.exists():
return
compiled = re.compile(pattern) if pattern else None
# Seek to near end
with log_path.open("r", encoding="utf-8", errors="ignore") as f:
f.seek(0, 2)
while not stop_event.is_set():
pos = f.tell()
line = f.readline()
if not line:
await asyncio.sleep(0.5)
f.seek(pos)
continue
if compiled and not compiled.search(line):
continue
# Emit compact line
send_progress(line.rstrip("\n"))
except Exception:
# Silent failure to avoid impacting tool execution
pass
def _should_stream_activity() -> bool:
"""Gate streamer by env flags; defaults to on when AUGGIE_CLI or STREAM_PROGRESS true."""
try:
return os.getenv("AUGGIE_CLI", "").strip().lower() == "true" or os.getenv("STREAM_PROGRESS", "true").strip().lower() == "true"
except Exception:
return False
# --- Core execution helper using fallback ---
async def _exec_with_fallback(
category: Any,
prompt: str,
model: Optional[str],
temperature: Optional[float] = None,
system_prompt: Optional[str] = None,
extra: Optional[dict] = None,
) -> tuple[Optional[ModelResponse], str, list[str]]:
"""Execute a model call via registry fallback.
Returns: (response, selected_model_name, progress)
"""
settings = _get_wrapper_settings()
progress: list[str] = []
if settings["show_progress"]:
progress.append("Resolving model and calling provider...")
# Prepare call fn for registry
def call_fn(selected_model: str) -> ModelResponse:
provider = Registry.get_provider_for_model(selected_model)
if provider is None:
raise RuntimeError(f"No provider for model {selected_model}")
kwargs = {}
if temperature is not None:
kwargs["temperature"] = float(temperature)
if system_prompt:
kwargs["system_prompt"] = system_prompt
if extra:
kwargs.update(extra)
return provider.generate_content(prompt=prompt, model_name=selected_model, **kwargs)
# If a specific model is requested, run directly but still collect telemetry via call_with_fallback on single-item chain
if model and model.lower() != "auto":
# Wrap single model execution in fallback to reuse telemetry
def single_chain(cat):
return [model]
# Monkeypatch chain temporarily by wrapping call
orig_chain = Registry._auggie_fallback_chain
try:
Registry._auggie_fallback_chain = classmethod(lambda cls, c, h=None: [model]) # type: ignore
resp = Registry.call_with_fallback(category, call_fn)
if settings["show_progress"]:
progress.append(f"Response received from {model}")
return resp, model, progress
finally:
Registry._auggie_fallback_chain = orig_chain # type: ignore
# Auto or unspecified: use full fallback chain
resp = Registry.call_with_fallback(category, call_fn)
chosen = getattr(resp, "model_name", "") or "auto"
if settings["show_progress"]:
progress.append(f"Response received from {chosen}")
return resp, chosen, progress
# --- Public wrappers ---
async def aug_chat(args: dict) -> str:
"""Auggie-optimized chat wrapper (FAST_RESPONSE category)."""
from tools.models import ToolModelCategory as Cat
ws = _get_wrapper_settings()
session_id = args.get("session_id") or args.get("continuation_id")
prompt = args.get("prompt", "").strip()
template = args.get("template")
model = args.get("model", "auto")
temperature = args.get("temperature")
# Choose default template if configured to auto_use
ws_all = get_auggie_settings() or {}
tconf = (ws_all.get("templates") or {})
auto_use = bool(tconf.get("auto_use", True))
if auto_use and not template:
template = "chat_basic"
final_prompt = _render_template_if_available(template, {"prompt": prompt, "session_id": session_id}, prompt)
# selector: explainable choice; still executed via fallback
if (model or "auto").lower() == "auto":
sel_model, sel_reason = select_model("FAST_RESPONSE", {"code": False})
if sel_model:
model = sel_model
if (ws_all.get("selector") or {}).get("explanations", True) and ws["show_progress"]:
# Include explanation in progress
progress_note = f"Selector chose: {model} ({sel_reason})"
# Delay list creation until after progress initialized below
pass
# Start compact activity tail if enabled
streamer_stop = asyncio.Event()
streamer_task = None
try:
if _should_stream_activity():
streamer_task = asyncio.create_task(_stream_activity_tail(streamer_stop, pattern=r"PROGRESS|TOOL_CALL|TOOL_COMPLETED"))
except Exception:
streamer_task = None
try:
resp, chosen, progress = await _exec_with_fallback(Cat.FAST_RESPONSE, final_prompt, model, temperature)
if ws["show_progress"] and model and model != "auto":
note = f"Selector chose: {model}"
if 'sel_reason' in locals() and (ws_all.get("selector") or {}).get("explanations", True):
note = f"{note} ({sel_reason})"
progress.insert(0, note)
usage = getattr(resp, "usage", {}) if resp else {}
data = {
"tool": "aug_chat",
"model": chosen,
"content": resp.content if resp else "",
"usage": usage,
"progress": progress if ws["show_progress"] else [],
}
_sess_update(session_id, last_model=chosen)
_sess_append(session_id, {"tool": "aug_chat", "model": chosen, "prompt": final_prompt, "content": data["content"]})
return _format_cli(data, compact=ws["compact_output"])
except Exception as e:
err = str(e)
data = {"tool": "aug_chat", "error": err, "progress": ["retry suggested: change model or simplify prompt"] if ws["show_progress"] else []}
return _format_cli(data, compact=ws["compact_output"])
finally:
try:
streamer_stop.set()
if streamer_task:
await asyncio.wait_for(streamer_task, timeout=1.0)
except Exception:
pass
async def aug_thinkdeep(args: dict) -> str:
"""Auggie-optimized thinkdeep wrapper (EXTENDED_REASONING category)."""
from tools.models import ToolModelCategory as Cat
from config import DEFAULT_THINKING_MODE_THINKDEEP as DEFAULT_THINKING
ws = _get_wrapper_settings()
ws_all = get_auggie_settings() or {}
session_id = args.get("session_id") or args.get("continuation_id")
prompt = args.get("step") or args.get("prompt") or ""
template = args.get("template")
model = args.get("model", "auto")
# Map thinking_mode to provider extra kwargs when supported (placeholder)
thinking_mode = args.get("thinking_mode", DEFAULT_THINKING)
final_prompt = _render_template_if_available(template, {"prompt": prompt, "session_id": session_id, "thinking_mode": thinking_mode}, str(prompt))
if (model or "auto").lower() == "auto":
sel_model, sel_reason = select_model("EXTENDED_REASONING", {"code": False})
if sel_model:
model = sel_model
# Start compact activity tail if enabled
streamer_stop = asyncio.Event()
streamer_task = None
try:
if _should_stream_activity():
streamer_task = asyncio.create_task(_stream_activity_tail(streamer_stop, pattern=r"PROGRESS|TOOL_CALL|TOOL_COMPLETED"))
except Exception:
streamer_task = None
try:
resp, chosen, progress = await _exec_with_fallback(Cat.EXTENDED_REASONING, final_prompt, model, extra={"thinking_mode": thinking_mode})
if ws["show_progress"] and model and model != "auto":
note = f"Selector chose: {model}"
if 'sel_reason' in locals() and (ws_all.get("selector") or {}).get("explanations", True):
note = f"{note} ({sel_reason})"
progress.insert(0, note)
usage = getattr(resp, "usage", {}) if resp else {}
data = {
"tool": "aug_thinkdeep",
"model": chosen,
"content": resp.content if resp else "",
"usage": usage,
"progress": (["Thinking mode: "+str(thinking_mode)] + progress) if ws["show_progress"] else [],
}
_sess_update(session_id, last_model=chosen)
_sess_append(session_id, {"tool": "aug_thinkdeep", "model": chosen, "prompt": final_prompt, "thinking_mode": thinking_mode, "content": data["content"]})
return _format_cli(data, compact=ws["compact_output"])
except Exception as e:
data = {"tool": "aug_thinkdeep", "error": str(e), "progress": ["retry suggested: lower thinking_mode or switch model"] if ws["show_progress"] else []}
return _format_cli(data, compact=ws["compact_output"])
finally:
try:
streamer_stop.set()
if streamer_task:
await asyncio.wait_for(streamer_task, timeout=1.0)
except Exception:
pass
async def aug_consensus(args: dict) -> str:
"""Auggie-optimized consensus wrapper: compare two models and synthesize.
If 'models' not provided, pick first two from fallback chain for EXTENDED_REASONING.
"""
from tools.models import ToolModelCategory as Cat
ws = _get_wrapper_settings()
session_id = args.get("session_id") or args.get("continuation_id")
prompt = args.get("step") or args.get("prompt") or ""
template = args.get("template")
# Normalize models argument to a simple list[str]
raw_models = args.get("models") or []
models: list[str] = []
try:
for item in raw_models:
if isinstance(item, str):
models.append(item)
elif isinstance(item, dict):
cand = item.get("model") or item.get("name") or item.get("id")
if cand:
models.append(str(cand))
except Exception:
models = []
final_prompt = _render_template_if_available(template, {"prompt": prompt, "session_id": session_id}, str(prompt))
# Build candidate models if not specified
if not models:
try:
chain = Registry._auggie_fallback_chain(Cat.EXTENDED_REASONING)
# Choose first two distinct
seen = set()
models = []
for m in chain:
if m not in seen:
seen.add(m)
models.append(m)
if len(models) >= 2:
break
except Exception:
models = []
chosen_models = models[:2] if models else []
if not chosen_models:
# Fallback to best single model
chosen_models = [Registry.get_preferred_fallback_model(Cat.EXTENDED_REASONING)]
results: list[dict[str, Any]] = []
progress: list[str] = ["Comparing models..."] if ws["show_progress"] else []
# Start compact activity tail if enabled
streamer_stop = asyncio.Event()
streamer_task = None
try:
if _should_stream_activity():
streamer_task = asyncio.create_task(_stream_activity_tail(streamer_stop, pattern=r"PROGRESS|TOOL_CALL|TOOL_COMPLETED"))
except Exception:
streamer_task = None
for m in chosen_models:
try:
# Run each model through fallback but pin to model m so telemetry still records
resp, chosen, _ = await _exec_with_fallback(Cat.EXTENDED_REASONING, final_prompt, m)
usage = getattr(resp, "usage", {}) if resp else {}
results.append({"model": chosen, "content": resp.content if resp else "", "usage": usage})
if ws["show_progress"]:
progress.append(f"Received response from {chosen}")
except Exception as e:
results.append({"model": m, "error": str(e)})
if ws["show_progress"]:
progress.append(f"Error from {m}: {e}")
# Stop streamer before synthesis to flush remaining lines
try:
streamer_stop.set()
if streamer_task:
await asyncio.wait_for(streamer_task, timeout=1.0)
except Exception:
pass
# Simple synthesis placeholder: prefer first content, append second delta note
synthesis = ""
if results:
synthesis = (results[0].get("content") or "").strip()
if len(results) > 1:
other_model = results[1].get("model")
if not isinstance(other_model, str):
other_model = str(other_model)
synthesis += f"\n\n---\nConsensus note: Compared against {other_model}"
data = {
"tool": "aug_consensus",
"models": [r.get("model") for r in results],
"results": results,
"synthesis": synthesis,
"progress": progress if ws["show_progress"] else [],
}
return _format_cli(data, compact=ws["compact_output"])