Skip to main content
Glama
gemini.pyโ€ข2.74 kB
"""Gemini CLI adapter.""" import asyncio import subprocess from typing import Any from .base import CLIAdapter class GeminiAdapter(CLIAdapter): """Adapter for Gemini CLI.""" async def execute(self, task: str, progress_callback: Any = None, timeout: int | None = None, **kwargs: Any) -> tuple[str, str, int]: """Execute task using Gemini CLI with optional streaming.""" # Use streaming method from base class if progress_callback provided if progress_callback: return await self.execute_streaming(task, progress_callback, timeout, **kwargs) # Otherwise use buffered execution (legacy) cmd = self.format_task(task, **kwargs) resolved_cmd = self.resolve_command(cmd) process = await asyncio.create_subprocess_exec( *resolved_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env={**subprocess.os.environ, **self.get_env()}, ) try: effective_timeout = timeout or self.get_timeout() stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=effective_timeout ) return ( stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace"), process.returncode or 0, ) except asyncio.TimeoutError: process.kill() await process.wait() raise TimeoutError(f"Gemini CLI timed out after {effective_timeout}s") def validate(self) -> bool: """Validate Gemini CLI is available.""" try: subprocess.run( ["which", "gemini"] if subprocess.os.name != "nt" else ["where", "gemini"], capture_output=True, check=True, ) return True except subprocess.CalledProcessError: return False def format_task(self, task: str, **kwargs: Any) -> list[str]: """Format task for Gemini CLI.""" cmd = ["gemini"] # Add model if specified in kwargs or config if model := kwargs.get("model") or self.config.get("model"): cmd.extend(["-m", model]) # Add allowed tools if specified if tools := kwargs.get("allowed_tools"): tool_list = tools if isinstance(tools, str) else ",".join(tools) cmd.extend(["--allowed-tools", tool_list]) # Add any custom args from config for arg in self.get_args(): if arg not in cmd: # Avoid duplicates cmd.append(arg) # Add task as final argument (quoted) cmd.append(task) return cmd

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/carlosduplar/multi-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server