"""Utility functions for grok-cli-mcp."""
from __future__ import annotations
import asyncio
import json
import os
import shutil
from typing import Any, Optional, Sequence
from mcp.server.fastmcp.server import Context
from .types import GrokMessage, GrokParsedOutput
# Constants
DEFAULT_GROK_BIN = "/opt/homebrew/bin/grok"
ENV_GROK_CLI_PATH = "GROK_CLI_PATH"
ENV_GROK_API_KEY = "GROK_API_KEY"
def _resolve_grok_path() -> str:
"""
Resolve the path to the Grok CLI binary.
Priority: explicit env override -> default homebrew path -> PATH lookup
"""
candidates: list[Optional[str]] = [
os.environ.get(ENV_GROK_CLI_PATH),
DEFAULT_GROK_BIN,
shutil.which("grok"),
]
for c in candidates:
if c and os.path.exists(c):
return c
# Return last candidate even if None; caller handles error
return candidates[-1] or "grok"
def _require_api_key() -> str:
"""
Validate that GROK_API_KEY environment variable is set.
Raises:
ValueError: If GROK_API_KEY is not set.
Returns:
The API key value.
"""
api_key = os.environ.get(ENV_GROK_API_KEY)
if not api_key:
raise ValueError(
f"{ENV_GROK_API_KEY} is not set. Please export your Grok API key in the environment."
)
return api_key
def _extract_json_from_text(text: str) -> Any:
"""
Best-effort extractor for JSON payloads printed by the Grok CLI.
Supports:
- a single JSON object: {...}
- an array of JSON objects: [...]
If multiple non-JSON lines are mixed in, attempts to locate the last JSON blob.
Args:
text: CLI output text that may contain JSON.
Returns:
Parsed JSON object or array.
Raises:
ValueError: If no JSON-like content found in text.
"""
text = text.strip()
# Fast path: try full parse
try:
return json.loads(text)
except Exception:
pass
# Try to find the last JSON block by scanning from the end
# Look for last closing brace/bracket and match from first opening
last_obj = text.rfind("}")
last_arr = text.rfind("]")
last = max(last_obj, last_arr)
if last == -1:
raise ValueError("No JSON-like content found in CLI output.")
# Find first plausible start
first_obj = text.find("{")
first_arr = text.find("[")
first_candidates = [i for i in [first_obj, first_arr] if i != -1]
if not first_candidates:
raise ValueError("No JSON-like content found in CLI output.")
first = min(first_candidates)
snippet = text[first : last + 1].strip()
return json.loads(snippet)
def _collect_assistant_text(messages: Sequence[GrokMessage]) -> str:
"""
Collate assistant message text from a sequence of messages.
Handles:
- content as a plain string
- content as a list of blocks with 'type'=='text'
- content as a dict with 'text' field
Args:
messages: Sequence of GrokMessage objects.
Returns:
Concatenated text from all assistant messages.
"""
chunks: list[str] = []
for m in messages:
if m.role != "assistant":
continue
c = m.content
if isinstance(c, str):
chunks.append(c)
elif isinstance(c, list):
for block in c:
try:
if isinstance(block, dict) and block.get("type") == "text" and "text" in block:
chunks.append(str(block["text"]))
elif isinstance(block, dict) and "content" in block:
chunks.append(str(block["content"]))
except Exception:
continue
elif isinstance(c, dict) and "text" in c:
chunks.append(str(c["text"]))
else:
# Fallback: stringify structured content
try:
chunks.append(json.dumps(c, ensure_ascii=False))
except Exception:
chunks.append(str(c))
return "\n".join([s for s in (s.strip() for s in chunks) if s])
async def _run_grok(
prompt: str,
*,
model: Optional[str],
timeout_s: float,
ctx: Optional[Context] = None,
) -> GrokParsedOutput:
"""
Run Grok CLI in headless mode: `grok -p "<prompt>" [-m <model>]`
Parse JSON output and return a structured response.
Args:
prompt: The prompt to send to Grok.
model: Optional Grok model name (passed with -m if provided).
timeout_s: Process timeout in seconds.
ctx: Optional FastMCP context for logging.
Returns:
GrokParsedOutput with messages, model, and raw output.
Raises:
FileNotFoundError: If Grok CLI binary not found.
TimeoutError: If CLI execution exceeds timeout.
RuntimeError: If CLI exits with non-zero code.
"""
grok_bin = _resolve_grok_path()
if not shutil.which(grok_bin) and not os.path.exists(grok_bin):
raise FileNotFoundError(
f"Grok CLI not found. Checked {grok_bin} and PATH. "
f"Set {ENV_GROK_CLI_PATH} or install grok CLI."
)
_require_api_key()
args = [grok_bin, "-p", prompt]
if model:
# Only pass -m if caller supplied a model; if CLI rejects, the error will be caught
args += ["-m", model]
env = os.environ.copy()
# Ensure GROK_API_KEY is present in the subprocess environment
env[ENV_GROK_API_KEY] = env[ENV_GROK_API_KEY]
if ctx:
await ctx.info(f"Invoking Grok CLI {'with model ' + model if model else ''}...")
proc = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env
)
try:
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=timeout_s)
except asyncio.TimeoutError:
try:
proc.kill()
except Exception:
pass
raise TimeoutError(f"Grok CLI timed out after {timeout_s:.0f}s")
stdout = (stdout_b or b"").decode("utf-8", errors="replace")
stderr = (stderr_b or b"").decode("utf-8", errors="replace")
if proc.returncode != 0:
# Grok CLI error; include stderr to help debugging
raise RuntimeError(f"Grok CLI failed (exit {proc.returncode}): {stderr.strip() or stdout.strip()}")
# Parse JSON payload
parsed: Any
try:
parsed = _extract_json_from_text(stdout)
except Exception as e:
# If JSON parse fails, provide raw output in a structured wrapper
if ctx:
await ctx.warning(f"Failed to parse Grok JSON output: {e}. Returning raw output.")
return GrokParsedOutput(messages=[], model=model, raw=stdout)
# Normalize to list of GrokMessage
messages: list[GrokMessage] = []
if isinstance(parsed, dict) and "role" in parsed and "content" in parsed:
messages = [GrokMessage(**parsed)]
elif isinstance(parsed, list):
# Either a list of messages or a list with one message
for item in parsed:
if isinstance(item, dict) and "role" in item and "content" in item:
messages.append(GrokMessage(**item))
elif isinstance(parsed, dict) and "messages" in parsed:
for item in parsed.get("messages", []) or []:
if isinstance(item, dict) and "role" in item and "content" in item:
messages.append(GrokMessage(**item))
else:
# Unknown shape: keep raw and empty messages
if ctx:
await ctx.warning("Unrecognized JSON shape from Grok CLI. Returning raw output.")
return GrokParsedOutput(messages=[], model=model, raw=stdout)
return GrokParsedOutput(messages=messages, model=model, raw=stdout)